diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
commit | f3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch) | |
tree | 32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /pool | |
parent | 5d2cd55ab522d4f1e65a833f91146444465a32ac (diff) |
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool')
-rw-r--r-- | pool/config.go | 75 | ||||
-rw-r--r-- | pool/interface.go | 53 | ||||
-rwxr-xr-x | pool/static_pool.go | 374 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 721 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 230 | ||||
-rw-r--r-- | pool/supervisor_test.go | 413 |
6 files changed, 1866 insertions, 0 deletions
diff --git a/pool/config.go b/pool/config.go new file mode 100644 index 00000000..3a058956 --- /dev/null +++ b/pool/config.go @@ -0,0 +1,75 @@ +package pool + +import ( + "runtime" + "time" +) + +// Config .. Pool config Configures the pool behavior. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers uint64 `mapstructure:"num_workers"` + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs uint64 `mapstructure:"max_jobs"` + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig `mapstructure:"supervisor"` +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = uint64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick time.Duration `mapstructure:"watch_tick"` + + // TTL defines maximum time worker is allowed to live. + TTL time.Duration `mapstructure:"ttl"` + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL time.Duration `mapstructure:"idle_ttl"` + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration `mapstructure:"exec_ttl"` + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = time.Second + } +} diff --git a/pool/interface.go b/pool/interface.go new file mode 100644 index 00000000..d089092f --- /dev/null +++ b/pool/interface.go @@ -0,0 +1,53 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs *payload.Payload) (*payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.BaseProcess) + + // RemoveWorker removes worker from the pool. + RemoveWorker(worker worker.BaseProcess) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) + + // ExecWithContext executes task with context which is used with timeout + execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) +} + +// Watcher is an interface for the Sync workers lifecycle +type Watcher interface { + // Watch used to add workers to the container + Watch(workers []worker.BaseProcess) error + + // Take takes the first free worker + Take(ctx context.Context) (worker.BaseProcess, error) + + // Release releases the worker putting it back to the queue + Release(w worker.BaseProcess) + + // Allocate - allocates new worker and put it into the WorkerWatcher + Allocate() error + + // Destroy destroys the underlying container + Destroy(ctx context.Context) + + // List return all container w/o removing it from internal storage + List() []worker.BaseProcess + + // Remove will remove worker from the container + Remove(wb worker.BaseProcess) +} diff --git a/pool/static_pool.go b/pool/static_pool.go new file mode 100755 index 00000000..25097395 --- /dev/null +++ b/pool/static_pool.go @@ -0,0 +1,374 @@ +package pool + +import ( + "context" + "os/exec" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/transport" + "github.com/spiral/roadrunner/v2/utils" + "github.com/spiral/roadrunner/v2/worker" + workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" +) + +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) + +type Options func(p *StaticPool) + +type Command func() *exec.Cmd + +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +type StaticPool struct { + cfg *Config + + // worker command creator + cmd Command + + // creates and connects to stack + factory transport.Factory + + // distributes the events + events events.Handler + + // saved list of event listeners + listeners []events.Listener + + // manages worker states and TTLs + ww Watcher + + // allocate new worker + allocator worker.Allocator + + // errEncoder is the default Exec error encoder + errEncoder ErrorEncoder +} + +// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { + const op = errors.Op("static_pool_initialize") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } + cfg.InitDefaults() + + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 + } + + p := &StaticPool{ + cfg: cfg, + cmd: cmd, + factory: factory, + events: events.NewEventsHandler(), + } + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + + // set up workers allocator + p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + // set up workers watcher + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + + // allocate requested number of workers + workers, err := p.allocateWorkers(p.cfg.NumWorkers) + if err != nil { + return nil, errors.E(op, err) + } + + // add workers to the watcher + err = p.ww.Watch(workers) + if err != nil { + return nil, errors.E(op, err) + } + + p.errEncoder = defaultErrEncoder(p) + + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } + + return p, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *StaticPool) { + p.listeners = listeners + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// AddListener connects event listener to the pool. +func (sp *StaticPool) addListener(listener events.Listener) { + sp.events.AddListener(listener) +} + +// GetConfig returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() interface{} { + return sp.cfg +} + +// Workers returns worker list associated with the pool. +func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { + return sp.ww.List() +} + +func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { + sp.ww.Remove(wb) + return nil +} + +// Exec executes provided payload on the worker +func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.takeWorker(ctxGetFree, op) + if err != nil { + return nil, errors.E(op, err) + } + + rsp, err := w.(worker.SyncWorker).Exec(p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { + sp.stopWorker(w) + return sp.Exec(p) + } + + if sp.cfg.MaxJobs != 0 { + sp.checkMaxJobs(w) + return rsp, nil + } + // return worker back + sp.ww.Release(w) + return rsp, nil +} + +// Be careful, sync with pool.Exec method +func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec_with_context") + if sp.cfg.Debug { + return sp.execDebugWithTTL(ctx, p) + } + + ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.takeWorker(ctxAlloc, op) + if err != nil { + return nil, errors.E(op, err) + } + + rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { + sp.stopWorker(w) + return sp.execWithTTL(ctx, p) + } + + if sp.cfg.MaxJobs != 0 { + sp.checkMaxJobs(w) + return rsp, nil + } + + // return worker back + sp.ww.Release(w) + return rsp, nil +} + +func (sp *StaticPool) stopWorker(w worker.BaseProcess) { + const op = errors.Op("static_pool_stop_worker") + w.State().Set(worker.StateInvalid) + err := w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } +} + +// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs +//go:inline +func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { + if w.State().NumExecs() >= sp.cfg.MaxJobs { + w.State().Set(worker.StateMaxJobsReached) + sp.ww.Release(w) + return + } + + sp.ww.Release(w) +} + +func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { + // Get function consumes context with timeout + w, err := sp.ww.Take(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + +// Destroy all underlying stack (but let them complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { + const op = errors.Op("error_encoder") + // just push event if on any stage was timeout error + switch { + case errors.Is(errors.ExecTTL, err): + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)}) + w.State().Set(worker.StateInvalid) + return nil, err + + case errors.Is(errors.SoftJob, err): + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop + w.State().Set(worker.StateInvalid) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) + + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, errors.E(op, err) + } + } +} + +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (worker.SyncWorker, error) { + ctxT, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + if err != nil { + return nil, err + } + + // wrap sync worker + sw := worker.From(w) + + sp.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, + Payload: sw, + }) + return sw, nil + } +} + +// execDebug used when debug mode was not set and exec_ttl is 0 +func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec_debug") + sw, err := sp.allocator() + if err != nil { + return nil, err + } + + // redirect call to the workers' exec method (without ttl) + r, err := sw.Exec(p) + if err != nil { + return nil, errors.E(op, err) + } + + // destroy the worker + sw.State().Set(worker.StateDestroyed) + err = sw.Kill() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + return nil, errors.E(op, err) + } + + return r, nil +} + +// execDebugWithTTL used when user set debug mode and exec_ttl +func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { + sw, err := sp.allocator() + if err != nil { + return nil, err + } + + // redirect call to the worker with TTL + r, err := sw.ExecWithTTL(ctx, p) + if stopErr := sw.Stop(); stopErr != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + +// allocate required number of stack +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { + const op = errors.Op("static_pool_allocate_workers") + workers := make([]worker.BaseProcess, 0, numWorkers) + + // constant number of stack simplify logic + for i := uint64(0); i < numWorkers; i++ { + w, err := sp.allocator() + if err != nil { + return nil, errors.E(op, errors.WorkerAllocate, err) + } + + workers = append(workers, w) + } + return workers, nil +} diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go new file mode 100755 index 00000000..29b0fc56 --- /dev/null +++ b/pool/static_pool_test.go @@ -0,0 +1,721 @@ +package pool + +import ( + "context" + "log" + "os/exec" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/transport/pipe" + "github.com/spiral/roadrunner/v2/utils" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +var cfg = &Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, +} + +func Test_NewPool(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) +} + +func Test_StaticPool_Invalid(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, + pipe.NewPipeFactory(), + cfg, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_ConfigNoErrorInitDefaults(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_StaticPool_Echo(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_NilContext(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_Context(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Empty(t, res.Body) + assert.NotNil(t, res.Context) + + assert.Equal(t, "world", string(res.Context)) +} + +func Test_StaticPool_JobError(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second * 2) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "hello") + p.Destroy(ctx) +} + +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 10) + + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerStderr { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + + <-block + + p.Destroy(ctx) +} + +func Test_StaticPool_Broken_FromOutside(t *testing.T) { + ctx := context.Background() + // Run pool events + ev := make(chan struct{}, 1) + listener := func(event interface{}) { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { + ev <- struct{}{} + } + } + } + + var cfg2 = &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg2, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(ctx) + time.Sleep(time.Second) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Equal(t, 1, len(p.Workers())) + + // first creation + <-ev + // killing random worker and expecting pool to replace it + err = p.Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } + + // re-creation + <-ev + + list := p.Workers() + for _, w := range list { + assert.Equal(t, worker.StateReady, w.State().Value()) + } +} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + AllocateTimeout: time.Nanosecond * 1, + DestroyTimeout: time.Second * 2, + }, + ) + assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") + } + assert.Nil(t, p) +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + defer p.Destroy(ctx) + // prevent process is not ready + time.Sleep(time.Second) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + Debug: true, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + defer p.Destroy(ctx) + + // prevent process is not ready + time.Sleep(time.Second) + assert.Len(t, p.Workers(), 0) + + var lastPID string + res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) + + assert.Len(t, p.Workers(), 0) + + for i := 0; i < 10; i++ { + assert.Len(t, p.Workers(), 0) + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_StaticPool_Stop_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + defer p.Destroy(ctx) + time.Sleep(time.Second) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + p.Destroy(ctx) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + go func() { + _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) + if errP != nil { + t.Errorf("error executing payload: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + + p.Destroy(ctx) + _, err = p.Exec(&payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 5, + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + for i := range p.Workers() { + p.Workers()[i].State().Set(worker.StateErrored) + } + + _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NoError(t, err) + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Slow_Destroy(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + p.Destroy(context.Background()) +} + +func Test_StaticPool_NoFreeWorkers(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 10) + + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventNoFreeWorkers { + block <- struct{}{} + } + } + } + + p, err := Initialize( + ctx, + // sleep for the 3 seconds + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + Debug: false, + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: nil, + }, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + go func() { + _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) + }() + + time.Sleep(time.Second) + res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + + <-block + + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand1(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand2(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +/* PTR: +Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op +Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op +Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op +Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op +Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op + +VAL: +Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op +Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op +Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op +Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op +Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op +Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op +Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op +*/ +func Benchmark_Pool_Echo(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + if err != nil { + b.Fatal(err) + } + + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + if _, err := p.Exec(pld); err != nil { + b.Fail() + } + } +} + +// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op +// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op +func Benchmark_Pool_Echo_Batched(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + + bd := make([]byte, 1024) + c := make([]byte, 1024) + + pld := &payload.Payload{ + Context: c, + Body: bd, + } + + b.ResetTimer() + b.ReportAllocs() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(pld); err != nil { + b.Fail() + log.Println(err) + } + }() + } + + wg.Wait() +} + +// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + &Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + } +} + +// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op +// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op +func BenchmarkToStringUnsafe(b *testing.B) { + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res := utils.AsString(testPayload) + _ = res + } +} + +// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op +// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op +func BenchmarkToStringSafe(b *testing.B) { + testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + res := toStringNotFun(testPayload) + _ = res + } +} + +func toStringNotFun(data []byte) string { + return string(data) +} diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go new file mode 100755 index 00000000..99af168c --- /dev/null +++ b/pool/supervisor_pool.go @@ -0,0 +1,230 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/state/process" + "github.com/spiral/roadrunner/v2/worker" +) + +const MB = 1024 * 1024 + +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck + +type Supervised interface { + Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + + return sp +} + +func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { + panic("used to satisfy pool interface") +} + +func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) + defer cancel() + + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return nil, errors.E(op, err) + } + + return res, nil +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.BaseProcess) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { //nolint:gocognit + now := time.Now() + + // MIGHT BE OUTDATED + // It's a copy of the Workers pointers + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + // if worker not in the Ready OR working state + // skip such worker + switch workers[i].State().Value() { + case + worker.StateInvalid, + worker.StateErrored, + worker.StateDestroyed, + worker.StateInactive, + worker.StateStopped, + worker.StateStopping, + worker.StateKilling: + continue + } + + s, err := process.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != worker.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds + // get last used unix nano + lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } + + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + + // maxWorkerIdle more than diff between now and last used + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + // just to double-check + workers[i].State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go new file mode 100644 index 00000000..e76c7ec4 --- /dev/null +++ b/pool/supervisor_test.go @@ -0,0 +1,413 @@ +package pool + +import ( + "context" + "os" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/transport/pipe" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var cfgSupervised = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + + pidBefore := p.Workers()[0].Pid() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 100) + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) + + p.Destroy(context.Background()) +} + +// This test should finish without freezes +func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { + var cfgSupervised = cfgSupervised + cfgSupervised.Debug = true + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/supervised.php") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 500) + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + p.Destroy(context.Background()) +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + pid = p.Workers()[0].Pid() + + resp, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + + p.Destroy(context.Background()) +} + +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + + // worker should be marked as invalid and reallocated + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + p.Destroy(context.Background()) +} + +func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 1 * time.Second, + IdleTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 2) + // should be destroyed, state should be Ready, not Invalid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + assert.Equal(t, int64(1), p.Workers()[0].State().Value()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_MaxMemoryReached(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 1, + }, + } + + block := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventMaxMemory { + block <- struct{}{} + } + } + } + + // constructed + // max memory + // constructed + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + AddListeners(listener), + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + <-block + p.Destroy(context.Background()) +} + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} |