summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-25 16:18:26 +0300
committerGitHub <[email protected]>2021-01-25 16:18:26 +0300
commitf1875f5715bf7635e17697ae3513ba3d21e4e524 (patch)
treef9c0c3876ef542217a8bd7ff17f90bffc018132f /pkg/pool
parenta063ad05b1cab8ec71eecc32f836efa4d431c6b8 (diff)
parent99bf203511b8af4be37186017e2e0c73a030d4f3 (diff)
Merge pull request #429 from spiral/2.0
Release 2.0-dev
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/config.go75
-rw-r--r--pkg/pool/interface.go29
-rwxr-xr-xpkg/pool/static_pool.go327
-rwxr-xr-xpkg/pool/static_pool_test.go647
-rwxr-xr-xpkg/pool/supervisor_pool.go222
-rw-r--r--pkg/pool/supervisor_test.go248
6 files changed, 1548 insertions, 0 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
new file mode 100644
index 00000000..782f7ce9
--- /dev/null
+++ b/pkg/pool/config.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "runtime"
+ "time"
+)
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers uint64 `mapstructure:"num_workers"`
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs uint64 `mapstructure:"max_jobs"`
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig `mapstructure:"supervisor"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = uint64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration `mapstructure:"watch_tick"`
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL time.Duration `mapstructure:"ttl"`
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
+}
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
new file mode 100644
index 00000000..4f7ae595
--- /dev/null
+++ b/pkg/pool/interface.go
@@ -0,0 +1,29 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs payload.Payload) (payload.Payload, error)
+
+ // ExecWithContext executes task with context which is used with timeout
+ ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.SyncWorker)
+
+ // Remove worker from the pool.
+ RemoveWorker(worker worker.SyncWorker) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
new file mode 100755
index 00000000..44adf9c0
--- /dev/null
+++ b/pkg/pool/static_pool.go
@@ -0,0 +1,327 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
+)
+
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
+
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error)
+
+type Options func(p *StaticPool)
+
+type Command func() *exec.Cmd
+
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
+type StaticPool struct {
+ cfg Config
+
+ // worker command creator
+ cmd Command
+
+ // creates and connects to stack
+ factory transport.Factory
+
+ // distributes the events
+ events events.Handler
+
+ // saved list of event listeners
+ listeners []events.Listener
+
+ // manages worker states and TTLs
+ ww workerWatcher.Watcher
+
+ // allocate new worker
+ allocator worker.Allocator
+
+ // errEncoder is the default Exec error encoder
+ errEncoder ErrorEncoder
+}
+
+// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) {
+ const op = errors.Op("static_pool_initialize")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
+ cfg.InitDefaults()
+
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
+ p := &StaticPool{
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: events.NewEventsHandler(),
+ }
+
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
+
+ p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // put stack in the pool
+ err = p.ww.AddToWatch(workers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.errEncoder = defaultErrEncoder(p)
+
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
+
+ return p, nil
+}
+
+func AddListeners(listeners ...events.Listener) Options {
+ return func(p *StaticPool) {
+ p.listeners = listeners
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
+// AddListener connects event listener to the pool.
+func (sp *StaticPool) addListener(listener events.Listener) {
+ sp.events.AddListener(listener)
+}
+
+// Config returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() interface{} {
+ return sp.cfg
+}
+
+// Workers returns worker list associated with the pool.
+func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
+ return sp.ww.WorkersList()
+}
+
+func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
+ return sp.ww.RemoveWorker(wb)
+}
+
+// Be careful, sync Exec with ExecWithContext
+func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("static_pool_exec")
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ rsp, err := w.Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ // TODO careful with string(rsp.Context)
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+
+ return sp.Exec(p)
+ }
+
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ return rsp, nil
+}
+
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_with_context")
+ ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ rsp, err := w.ExecWithTimeout(ctx, p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+ return sp.ExecWithContext(ctx, p)
+ }
+
+ err = sp.checkMaxJobs(w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
+ const op = errors.Op("static_pool_stop_worker")
+ w.State().Set(internal.StateInvalid)
+ err := w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+}
+
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
+ const op = errors.Op("static_pool_check_max_jobs")
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+ return nil
+}
+
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
+// Destroy all underlying stack (but let them to complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.SyncWorker) (payload.Payload, error) {
+ const op = errors.Op("error encoder")
+ // just push event if on any stage was timeout error
+ if errors.Is(errors.ExecTTL, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ }
+ // soft job errors are allowed
+ if errors.Is(errors.SoftJob, err) {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
+ }
+
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+
+ return payload.Payload{}, errors.E(op, err)
+ }
+
+ w.State().Set(internal.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ errS := w.Stop()
+
+ if errS != nil {
+ return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ }
+
+ return payload.Payload{}, errors.E(op, err)
+ }
+}
+
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (*worker.SyncWorkerImpl, error) {
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
+ if err != nil {
+ return nil, err
+ }
+
+ sw := worker.From(w)
+
+ sp.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
+ Payload: sw,
+ })
+ return sw, nil
+ }
+}
+
+func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
+ sw, err := sp.allocator()
+ if err != nil {
+ return payload.Payload{}, err
+ }
+
+ r, err := sw.Exec(p)
+
+ if stopErr := sw.Stop(); stopErr != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
+// allocate required number of stack
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
+ const op = errors.Op("allocate workers")
+ var workers []worker.SyncWorker
+
+ // constant number of stack simplify logic
+ for i := uint64(0); i < numWorkers; i++ {
+ w, err := sp.allocator()
+ if err != nil {
+ return nil, errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ workers = append(workers, w)
+ }
+ return workers, nil
+}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
new file mode 100755
index 00000000..a32790e0
--- /dev/null
+++ b/pkg/pool/static_pool_test.go
@@ -0,0 +1,647 @@
+package pool
+
+import (
+ "context"
+ "log"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfg = Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+}
+
+func Test_NewPool(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+}
+
+func Test_StaticPool_Invalid(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+
+ assert.Nil(t, p)
+ assert.Error(t, err)
+}
+
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+}
+
+func Test_StaticPool_Echo(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_NilContext(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_Context(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.Empty(t, res.Body)
+ assert.NotNil(t, res.Context)
+
+ assert.Equal(t, "world", string(res.Context))
+}
+
+func Test_StaticPool_JobError(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.SoftJob, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "hello")
+}
+
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerLog {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ ctx := context.Background()
+ // Consume pool events
+ ev := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
+ ev <- struct{}{}
+ }
+ }
+ }
+
+ var cfg = Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, 1, len(p.Workers()))
+
+ // first creation
+ <-ev
+ // killing random worker and expecting pool to replace it
+ err = p.Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
+
+ // re-creation
+ <-ev
+
+ list := p.Workers()
+ for _, w := range list {
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ }
+}
+
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Nanosecond * 1,
+ DestroyTimeout: time.Second * 2,
+ },
+ )
+ assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
+ }
+ assert.Nil(t, p)
+}
+
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: true,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ assert.Len(t, p.Workers(), 0)
+
+ var lastPID string
+ res, _ := p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
+
+ assert.Len(t, p.Workers(), 0)
+
+ for i := 0; i < 10; i++ {
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_StaticPool_Stop_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ go func() {
+ _, err := p.Exec(payload.Payload{Body: []byte("100")})
+ if err != nil {
+ t.Errorf("error executing payload: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ for i := range p.Workers() {
+ p.Workers()[i].State().Set(internal.StateErrored)
+ }
+
+ _, err = p.Exec(payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Slow_Destroy(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy(context.Background())
+}
+
+func Test_StaticPool_NoFreeWorkers(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 1)
+
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventNoFreeWorkers {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ // sleep for the 3 seconds
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: false,
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: nil,
+ },
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ go func() {
+ _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ }()
+
+ time.Sleep(time.Second)
+ res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand1(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand2(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+func Benchmark_Pool_Echo(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+//
+func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+
+ var wg sync.WaitGroup
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+//
+func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
new file mode 100755
index 00000000..2597b352
--- /dev/null
+++ b/pkg/pool/supervisor_pool.go
@@ -0,0 +1,222 @@
+package pool
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/tools"
+)
+
+const MB = 1024 * 1024
+
+// NSEC_IN_SEC nanoseconds in second
+const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+
+type Supervised interface {
+ Pool
+ // Start used to start watching process for all pool workers
+ Start()
+}
+
+type supervised struct {
+ cfg *SupervisorConfig
+ events events.Handler
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
+}
+
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
+ }
+
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p payload.Payload
+}
+
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec_with_context")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: payload.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return payload.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
+ }
+ return rsp, nil
+}
+
+func (sp *supervised) GetConfig() interface{} {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervised) Workers() (workers []worker.SyncWorker) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error {
+ return sp.pool.RemoveWorker(worker)
+}
+
+func (sp *supervised) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
+}
+
+func (sp *supervised) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.mu.Lock()
+ sp.control()
+ sp.mu.Unlock()
+ }
+ }
+ }()
+}
+
+func (sp *supervised) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervised) control() {
+ now := time.Now()
+ const op = errors.Op("supervised_pool_control_tick")
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == internal.StateInvalid {
+ continue
+ }
+
+ s, err := tools.WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != internal.StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // 1610530005534416045 lu
+ // lu - now = -7811150814 - nanoseconds
+ // 7.8 seconds
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+ // worker not used, skip
+ if lu == 0 {
+ continue
+ }
+
+ // convert last used to unixNano and sub time.now to seconds
+ // negative number, because lu always in the past, except for the `back to the future` :)
+ res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
+
+ // maxWorkerIdle more than diff between now and last used
+ // for example:
+ // After exec worker goes to the rest
+ // And resting for the 5 seconds
+ // IdleTTL is 1 second.
+ // After the control check, res will be 5, idle is 1
+ // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
new file mode 100644
index 00000000..c67d5d91
--- /dev/null
+++ b/pkg/pool/supervisor_test.go
@@ -0,0 +1,248 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/tools"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfgSupervised = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ stopCh := make(chan struct{})
+ defer p.Destroy(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ workers := p.Workers()
+ if len(workers) > 0 {
+ s, err := tools.WorkerProcessState(workers[0])
+ assert.NoError(t, err)
+ assert.NotNil(t, s)
+ // since this is soft limit, double max memory limit watch
+ if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
+ assert.Fail(t, "max memory reached")
+ }
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 50)
+ _, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_Idle(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Nil(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 5)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be the same pid
+ assert.Equal(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 1,
+ },
+ }
+
+ block := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventMaxMemory {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ // constructed
+ // max memory
+ // constructed
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ AddListeners(listener),
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ <-block
+ p.Destroy(context.Background())
+}