diff options
author | Valery Piashchynski <[email protected]> | 2022-01-15 12:08:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2022-01-15 12:08:20 +0300 |
commit | 5254c8eb27311e2a8a53a4c90c3829cf1238c563 (patch) | |
tree | b51c9a4c1dd4c25adc511498ce0380a7078c5572 /pool | |
parent | 13609dd03dd0d2fa85b9fb850be787bf4e2ea67f (diff) |
Repository content update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool')
-rw-r--r-- | pool/config.go | 75 | ||||
-rw-r--r-- | pool/interface.go | 59 | ||||
-rwxr-xr-x | pool/static_pool.go | 392 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 733 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 247 | ||||
-rw-r--r-- | pool/supervisor_test.go | 437 |
6 files changed, 0 insertions, 1943 deletions
diff --git a/pool/config.go b/pool/config.go deleted file mode 100644 index 3a058956..00000000 --- a/pool/config.go +++ /dev/null @@ -1,75 +0,0 @@ -package pool - -import ( - "runtime" - "time" -) - -// Config .. Pool config Configures the pool behavior. -type Config struct { - // Debug flag creates new fresh worker before every request. - Debug bool - - // NumWorkers defines how many sub-processes can be run at once. This value - // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers uint64 `mapstructure:"num_workers"` - - // MaxJobs defines how many executions is allowed for the worker until - // it's destruction. set 1 to create new process for each new task, 0 to let - // worker handle as many tasks as it can. - MaxJobs uint64 `mapstructure:"max_jobs"` - - // AllocateTimeout defines for how long pool will be waiting for a worker to - // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` - - // DestroyTimeout defines for how long pool should be waiting for worker to - // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` - - // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig `mapstructure:"supervisor"` -} - -// InitDefaults enables default config values. -func (cfg *Config) InitDefaults() { - if cfg.NumWorkers == 0 { - cfg.NumWorkers = uint64(runtime.NumCPU()) - } - - if cfg.AllocateTimeout == 0 { - cfg.AllocateTimeout = time.Minute - } - - if cfg.DestroyTimeout == 0 { - cfg.DestroyTimeout = time.Minute - } - if cfg.Supervisor == nil { - return - } - cfg.Supervisor.InitDefaults() -} - -type SupervisorConfig struct { - // WatchTick defines how often to check the state of worker. - WatchTick time.Duration `mapstructure:"watch_tick"` - - // TTL defines maximum time worker is allowed to live. - TTL time.Duration `mapstructure:"ttl"` - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL time.Duration `mapstructure:"idle_ttl"` - - // ExecTTL defines maximum lifetime per job. - ExecTTL time.Duration `mapstructure:"exec_ttl"` - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` -} - -// InitDefaults enables default config values. -func (cfg *SupervisorConfig) InitDefaults() { - if cfg.WatchTick == 0 { - cfg.WatchTick = time.Second - } -} diff --git a/pool/interface.go b/pool/interface.go deleted file mode 100644 index 6a150188..00000000 --- a/pool/interface.go +++ /dev/null @@ -1,59 +0,0 @@ -package pool - -import ( - "context" - - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" -) - -// Pool managed set of inner worker processes. -type Pool interface { - // GetConfig returns pool configuration. - GetConfig() interface{} - - // Exec executes task with payload - Exec(rqs *payload.Payload) (*payload.Payload, error) - - // Workers returns worker list associated with the pool. - Workers() (workers []worker.BaseProcess) - - // RemoveWorker removes worker from the pool. - RemoveWorker(worker worker.BaseProcess) error - - // Reset kill all workers inside the watcher and replaces with new - Reset(ctx context.Context) error - - // Destroy all underlying stack (but let them to complete the task). - Destroy(ctx context.Context) - - // ExecWithContext executes task with context which is used with timeout - execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error) -} - -// Watcher is an interface for the Sync workers lifecycle -type Watcher interface { - // Watch used to add workers to the container - Watch(workers []worker.BaseProcess) error - - // Take takes the first free worker - Take(ctx context.Context) (worker.BaseProcess, error) - - // Release releases the worker putting it back to the queue - Release(w worker.BaseProcess) - - // Allocate - allocates new worker and put it into the WorkerWatcher - Allocate() error - - // Destroy destroys the underlying container - Destroy(ctx context.Context) - - // Reset will replace container and workers array, kill all workers - Reset(ctx context.Context) - - // List return all container w/o removing it from internal storage - List() []worker.BaseProcess - - // Remove will remove worker from the container - Remove(wb worker.BaseProcess) -} diff --git a/pool/static_pool.go b/pool/static_pool.go deleted file mode 100755 index dfd9ffd3..00000000 --- a/pool/static_pool.go +++ /dev/null @@ -1,392 +0,0 @@ -package pool - -import ( - "context" - "os/exec" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/events" - "github.com/spiral/roadrunner/v2/ipc" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/utils" - "github.com/spiral/roadrunner/v2/worker" - workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" - "go.uber.org/zap" -) - -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = `{"stop":true}` -) - -// ErrorEncoder encode error or make a decision based on the error type -type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) - -type Options func(p *StaticPool) - -type Command func() *exec.Cmd - -// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. -type StaticPool struct { - cfg *Config - log *zap.Logger - - // worker command creator - cmd Command - - // creates and connects to stack - factory ipc.Factory - - // manages worker states and TTLs - ww Watcher - - // allocate new worker - allocator worker.Allocator - - // errEncoder is the default Exec error encoder - errEncoder ErrorEncoder -} - -// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) { - if factory == nil { - return nil, errors.Str("no factory initialized") - } - cfg.InitDefaults() - - if cfg.Debug { - cfg.NumWorkers = 0 - cfg.MaxJobs = 1 - } - - p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - } - - // add pool options - for i := 0; i < len(options); i++ { - options[i](p) - } - - if p.log == nil { - z, err := zap.NewProduction() - if err != nil { - return nil, err - } - - p.log = z - } - - // set up workers allocator - p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) - // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout) - - // allocate requested number of workers - workers, err := p.allocateWorkers(p.cfg.NumWorkers) - if err != nil { - return nil, err - } - - // add workers to the watcher - err = p.ww.Watch(workers) - if err != nil { - return nil, err - } - - p.errEncoder = defaultErrEncoder(p) - - // if supervised config not nil, guess, that pool wanted to be supervised - if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.log, p.cfg.Supervisor) - // start watcher timer - sp.Start() - return sp, nil - } - - return p, nil -} - -func WithLogger(z *zap.Logger) Options { - return func(p *StaticPool) { - p.log = z - } -} - -// GetConfig returns associated pool configuration. Immutable. -func (sp *StaticPool) GetConfig() interface{} { - return sp.cfg -} - -// Workers returns worker list associated with the pool. -func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { - return sp.ww.List() -} - -func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { - sp.ww.Remove(wb) - return nil -} - -// Exec executes provided payload on the worker -func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec") - if sp.cfg.Debug { - return sp.execDebug(p) - } - ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) - defer cancel() - w, err := sp.takeWorker(ctxGetFree, op) - if err != nil { - return nil, errors.E(op, err) - } - - rsp, err := w.(worker.SyncWorker).Exec(p) - if err != nil { - return sp.errEncoder(err, w) - } - - // worker want's to be terminated - if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { - sp.stopWorker(w) - return sp.Exec(p) - } - - if sp.cfg.MaxJobs != 0 { - sp.checkMaxJobs(w) - return rsp, nil - } - // return worker back - sp.ww.Release(w) - return rsp, nil -} - -// Destroy all underlying stack (but let them complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.ww.Destroy(ctx) -} - -func (sp *StaticPool) Reset(ctx context.Context) error { - // destroy all workers - sp.ww.Reset(ctx) - workers, err := sp.allocateWorkers(sp.cfg.NumWorkers) - if err != nil { - return err - } - // add the NEW workers to the watcher - err = sp.ww.Watch(workers) - if err != nil { - return err - } - - return nil -} - -func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - // just push event if on any stage was timeout error - switch { - case errors.Is(errors.ExecTTL, err): - sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) - w.State().Set(worker.StateInvalid) - return nil, err - - case errors.Is(errors.SoftJob, err): - sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - // if max jobs exceed - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Set(worker.StateInvalid) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - - // soft jobs errors are allowed, just put the worker back - sp.ww.Release(w) - - return nil, err - case errors.Is(errors.Network, err): - // in case of network error, we can't stop the worker, we should kill it - w.State().Set(worker.StateInvalid) - sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - // kill the worker instead of sending net packet to it - _ = w.Kill() - - return nil, err - default: - w.State().Set(worker.StateInvalid) - sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) - // stop the worker, worker here might be in the broken state (network) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - } -} - -// Be careful, sync with pool.Exec method -func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("static_pool_exec_with_context") - if sp.cfg.Debug { - return sp.execDebugWithTTL(ctx, p) - } - - ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) - defer cancel() - w, err := sp.takeWorker(ctxAlloc, op) - if err != nil { - return nil, errors.E(op, err) - } - - rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) - if err != nil { - return sp.errEncoder(err, w) - } - - // worker want's to be terminated - if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest { - sp.stopWorker(w) - return sp.execWithTTL(ctx, p) - } - - if sp.cfg.MaxJobs != 0 { - sp.checkMaxJobs(w) - return rsp, nil - } - - // return worker back - sp.ww.Release(w) - return rsp, nil -} - -func (sp *StaticPool) stopWorker(w worker.BaseProcess) { - w.State().Set(worker.StateInvalid) - err := w.Stop() - if err != nil { - sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - } -} - -// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { - if w.State().NumExecs() >= sp.cfg.MaxJobs { - w.State().Set(worker.StateMaxJobsReached) - sp.ww.Release(w) - return - } - - sp.ww.Release(w) -} - -func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { - // Get function consumes context with timeout - w, err := sp.ww.Take(ctxGetFree) - if err != nil { - // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout - if errors.Is(errors.NoFreeWorkers, err) { - sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err)) - return nil, errors.E(op, err) - } - // else if err not nil - return error - return nil, errors.E(op, err) - } - return w, nil -} - -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator { - return func() (worker.SyncWorker, error) { - ctxT, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) - if err != nil { - return nil, err - } - - // wrap sync worker - sw := worker.From(w) - - sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) - return sw, nil - } -} - -// execDebug used when debug mode was not set and exec_ttl is 0 -func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { - sw, err := sp.allocator() - if err != nil { - return nil, err - } - - // redirect call to the workers' exec method (without ttl) - r, err := sw.Exec(p) - if err != nil { - return nil, err - } - - go func() { - // read the exit status to prevent process to be a zombie - _ = sw.Wait() - }() - - // destroy the worker - err = sw.Stop() - if err != nil { - sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - return nil, err - } - - return r, nil -} - -// execDebugWithTTL used when user set debug mode and exec_ttl -func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { - sw, err := sp.allocator() - if err != nil { - return nil, err - } - - // redirect call to the worker with TTL - r, err := sw.ExecWithTTL(ctx, p) - if err != nil { - return nil, err - } - - go func() { - // read the exit status to prevent process to be a zombie - _ = sw.Wait() - }() - - err = sw.Stop() - if err != nil { - sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) - return nil, err - } - - return r, err -} - -// allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - workers := make([]worker.BaseProcess, 0, numWorkers) - - // constant number of stack simplify logic - for i := uint64(0); i < numWorkers; i++ { - w, err := sp.allocator() - if err != nil { - return nil, errors.E(errors.WorkerAllocate, err) - } - - workers = append(workers, w) - } - return workers, nil -} diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go deleted file mode 100755 index 5db2bd86..00000000 --- a/pool/static_pool_test.go +++ /dev/null @@ -1,733 +0,0 @@ -package pool - -import ( - "context" - l "log" - "os/exec" - "runtime" - "strconv" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/ipc/pipe" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/utils" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -var cfg = &Config{ - NumWorkers: uint64(runtime.NumCPU()), - AllocateTimeout: time.Second * 500, - DestroyTimeout: time.Second * 500, -} - -var log = zap.NewNop() - -func Test_NewPool(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) -} - -func Test_NewPoolReset(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - w := p.Workers() - if len(w) == 0 { - t.Fatal("should be workers inside") - } - - pid := w[0].Pid() - require.NoError(t, p.Reset(context.Background())) - - w2 := p.Workers() - if len(w2) == 0 { - t.Fatal("should be workers inside") - } - - require.NotEqual(t, pid, w2[0].Pid()) - p.Destroy(ctx) -} - -func Test_StaticPool_Invalid(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") }, - pipe.NewPipeFactory(log), - cfg, - ) - - assert.Nil(t, p) - assert.Error(t, err) -} - -func Test_ConfigNoErrorInitDefaults(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) - p.Destroy(context.Background()) -} - -func Test_StaticPool_Echo(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_StaticPool_Echo_NilContext(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_StaticPool_Echo_Context(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - - defer p.Destroy(ctx) - - assert.NotNil(t, p) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.Empty(t, res.Body) - assert.NotNil(t, res.Context) - - assert.Equal(t, "world", string(res.Context)) -} - -func Test_StaticPool_JobError(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second * 2) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.Exec") - } - - assert.Contains(t, err.Error(), "hello") - p.Destroy(ctx) -} - -func Test_StaticPool_Broken_Replace(t *testing.T) { - ctx := context.Background() - - z, err := zap.NewProduction() - require.NoError(t, err) - - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - WithLogger(z), - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - p.Destroy(ctx) -} - -func Test_StaticPool_Broken_FromOutside(t *testing.T) { - ctx := context.Background() - - var cfg2 = &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second * 5, - DestroyTimeout: time.Second * 5, - } - - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg2, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(ctx) - time.Sleep(time.Second) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) - assert.Equal(t, 1, len(p.Workers())) - - // first creation - time.Sleep(time.Second * 2) - // killing random worker and expecting pool to replace it - err = p.Workers()[0].Kill() - if err != nil { - t.Errorf("error killing the process: error %v", err) - } - - // re-creation - time.Sleep(time.Second * 2) - list := p.Workers() - for _, w := range list { - assert.Equal(t, worker.StateReady, w.State().Value()) - } -} - -func Test_StaticPool_AllocateTimeout(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Nanosecond * 1, - DestroyTimeout: time.Second * 2, - }, - ) - assert.Error(t, err) - if !errors.Is(errors.WorkerAllocate, err) { - t.Fatal("error should be of type WorkerAllocate") - } - assert.Nil(t, p) -} - -func Test_StaticPool_Replace_Worker(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - MaxJobs: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - // prevent process is not ready - time.Sleep(time.Second) - - var lastPID string - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - - res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Equal(t, lastPID, string(res.Body)) - - for i := 0; i < 10; i++ { - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -func Test_StaticPool_Debug_Worker(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - Debug: true, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - - // prevent process is not ready - time.Sleep(time.Second) - assert.Len(t, p.Workers(), 0) - - var lastPID string - res, _ := p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotEqual(t, lastPID, string(res.Body)) - - assert.Len(t, p.Workers(), 0) - - for i := 0; i < 10; i++ { - assert.Len(t, p.Workers(), 0) - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -// identical to replace but controlled on worker side -func Test_StaticPool_Stop_Worker(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - defer p.Destroy(ctx) - time.Sleep(time.Second) - - var lastPID string - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, lastPID, string(res.Body)) - - for i := 0; i < 10; i++ { - res, err := p.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.NotEqual(t, lastPID, string(res.Body)) - lastPID = string(res.Body) - } -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Destroy_And_Close(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) - - p.Destroy(ctx) - _, err = p.Exec(&payload.Payload{Body: []byte("100")}) - assert.Error(t, err) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NotNil(t, p) - assert.NoError(t, err) - - go func() { - _, errP := p.Exec(&payload.Payload{Body: []byte("100")}) - if errP != nil { - t.Errorf("error executing payload: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - - p.Destroy(ctx) - _, err = p.Exec(&payload.Payload{Body: []byte("100")}) - assert.Error(t, err) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Handle_Dead(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second * 100, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - for i := range p.Workers() { - p.Workers()[i].State().Set(worker.StateErrored) - } - - _, err = p.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) - p.Destroy(ctx) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_Slow_Destroy(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - p.Destroy(context.Background()) -} - -func Test_StaticPool_NoFreeWorkers(t *testing.T) { - ctx := context.Background() - - p, err := NewStaticPool( - ctx, - // sleep for the 3 seconds - func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - Debug: false, - NumWorkers: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: nil, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - go func() { - _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - }() - - time.Sleep(time.Second) - res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second) - - p.Destroy(ctx) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_WrongCommand1(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.Error(t, err) - assert.Nil(t, p) -} - -// identical to replace but controlled on worker side -func Test_Static_Pool_WrongCommand2(t *testing.T) { - p, err := NewStaticPool( - context.Background(), - func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 5, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - - assert.Error(t, err) - assert.Nil(t, p) -} - -func Test_CRC_WithPayload(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, - pipe.NewPipeFactory(log), - cfg, - ) - assert.Error(t, err) - data := err.Error() - assert.Contains(t, data, "warning: some weird php erro") - require.Nil(t, p) -} - -/* PTR: -Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op -Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op -Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op -Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op -Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op -Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op -Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op - -VAL: -Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op -Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op -Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op -Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op -Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op -Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op -Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op -*/ -func Benchmark_Pool_Echo(b *testing.B) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfg, - ) - if err != nil { - b.Fatal(err) - } - - bd := make([]byte, 1024) - c := make([]byte, 1024) - - pld := &payload.Payload{ - Context: c, - Body: bd, - } - - b.ResetTimer() - b.ReportAllocs() - for n := 0; n < b.N; n++ { - if _, err := p.Exec(pld); err != nil { - b.Fail() - } - } -} - -// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op -// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op -// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op -func Benchmark_Pool_Echo_Batched(b *testing.B) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: uint64(runtime.NumCPU()), - AllocateTimeout: time.Second * 100, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(b, err) - defer p.Destroy(ctx) - - bd := make([]byte, 1024) - c := make([]byte, 1024) - - pld := &payload.Payload{ - Context: c, - Body: bd, - } - - b.ResetTimer() - b.ReportAllocs() - - var wg sync.WaitGroup - for i := 0; i < b.N; i++ { - wg.Add(1) - go func() { - defer wg.Done() - if _, err := p.Exec(pld); err != nil { - b.Fail() - l.Println(err) - } - }() - } - - wg.Wait() -} - -// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op -func Benchmark_Pool_Echo_Replaced(b *testing.B) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - &Config{ - NumWorkers: 1, - MaxJobs: 1, - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - }, - ) - assert.NoError(b, err) - defer p.Destroy(ctx) - b.ResetTimer() - b.ReportAllocs() - - for n := 0; n < b.N; n++ { - if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - l.Println(err) - } - } -} - -// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op -// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op -func BenchmarkToStringUnsafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - res := utils.AsString(testPayload) - _ = res - } -} - -// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op -// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op -func BenchmarkToStringSafe(b *testing.B) { - testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj") - - b.ResetTimer() - b.ReportAllocs() - - for i := 0; i < b.N; i++ { - res := toStringNotFun(testPayload) - _ = res - } -} - -func toStringNotFun(data []byte) string { - return string(data) -} diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go deleted file mode 100755 index 59834859..00000000 --- a/pool/supervisor_pool.go +++ /dev/null @@ -1,247 +0,0 @@ -package pool - -import ( - "context" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/events" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/state/process" - "github.com/spiral/roadrunner/v2/worker" - "go.uber.org/zap" -) - -const ( - MB = 1024 * 1024 -) - -// NSEC_IN_SEC nanoseconds in second -const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck - -type Supervised interface { - Pool - // Start used to start watching process for all pool workers - Start() -} - -type supervised struct { - cfg *SupervisorConfig - pool Pool - log *zap.Logger - stopCh chan struct{} - mu *sync.RWMutex -} - -func supervisorWrapper(pool Pool, log *zap.Logger, cfg *SupervisorConfig) *supervised { - sp := &supervised{ - cfg: cfg, - pool: pool, - log: log, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), - } - - return sp -} - -func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) { - panic("used to satisfy pool interface") -} - -func (sp *supervised) Reset(ctx context.Context) error { - sp.mu.Lock() - defer sp.mu.Unlock() - return sp.pool.Reset(ctx) -} - -func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - sp.mu.RLock() - defer sp.mu.RUnlock() - return sp.pool.Exec(rqs) - } - - ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) - defer cancel() - - sp.mu.RLock() - defer sp.mu.RUnlock() - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - return nil, errors.E(op, err) - } - - return res, nil -} - -func (sp *supervised) GetConfig() interface{} { - return sp.pool.GetConfig() -} - -func (sp *supervised) Workers() (workers []worker.BaseProcess) { - sp.mu.Lock() - defer sp.mu.Unlock() - return sp.pool.Workers() -} - -func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { - return sp.pool.RemoveWorker(worker) -} - -func (sp *supervised) Destroy(ctx context.Context) { - sp.Stop() - sp.mu.Lock() - sp.pool.Destroy(ctx) - sp.mu.Unlock() -} - -func (sp *supervised) Start() { - go func() { - watchTout := time.NewTicker(sp.cfg.WatchTick) - defer watchTout.Stop() - - for { - select { - case <-sp.stopCh: - return - // stop here - case <-watchTout.C: - sp.mu.Lock() - sp.control() - sp.mu.Unlock() - } - } - }() -} - -func (sp *supervised) Stop() { - sp.stopCh <- struct{}{} -} - -func (sp *supervised) control() { //nolint:gocognit - now := time.Now() - - // MIGHT BE OUTDATED - // It's a copy of the Workers pointers - workers := sp.pool.Workers() - - for i := 0; i < len(workers); i++ { - // if worker not in the Ready OR working state - // skip such worker - switch workers[i].State().Value() { - case - worker.StateInvalid, - worker.StateErrored, - worker.StateDestroyed, - worker.StateInactive, - worker.StateStopped, - worker.StateStopping, - worker.StateKilling: - continue - } - - s, err := process.WorkerProcessState(workers[i]) - if err != nil { - // worker not longer valid for supervision - continue - } - - if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double check - workers[i].State().Set(worker.StateInvalid) - sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) - continue - } - - if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double check - workers[i].State().Set(worker.StateInvalid) - sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String())) - continue - } - - // firs we check maxWorker idle - if sp.cfg.IdleTTL != 0 { - // then check for the worker state - if workers[i].State().Value() != worker.StateReady { - continue - } - - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - - // 1610530005534416045 lu - // lu - now = -7811150814 - nanoseconds - // 7.8 seconds - // get last used unix nano - lu := workers[i].State().LastUsed() - // worker not used, skip - if lu == 0 { - continue - } - - // convert last used to unixNano and sub time.now to seconds - // negative number, because lu always in the past, except for the `back to the future` :) - res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 - - // maxWorkerIdle more than diff between now and last used - // for example: - // After exec worker goes to the rest - // And resting for the 5 seconds - // IdleTTL is 1 second. - // After the control check, res will be 5, idle is 1 - // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. - if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { - /* - worker at this point might be in the middle of request execution: - - ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release - ^ - TTL Reached, state - invalid | - -----> Worker Stopped here - */ - - if workers[i].State().Value() != worker.StateWorking { - workers[i].State().Set(worker.StateInvalid) - _ = workers[i].Stop() - } - // just to double-check - workers[i].State().Set(worker.StateInvalid) - sp.log.Debug("idle_ttl", zap.String("reason", "idle ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) - } - } - } -} diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go deleted file mode 100644 index a479671f..00000000 --- a/pool/supervisor_test.go +++ /dev/null @@ -1,437 +0,0 @@ -package pool - -import ( - "context" - "os" - "os/exec" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/ipc/pipe" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -var cfgSupervised = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 100 * time.Second, - MaxWorkerMemory: 100, - }, -} - -func TestSupervisedPool_Exec(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgSupervised, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - - pidBefore := p.Workers()[0].Pid() - - for i := 0; i < 10; i++ { - time.Sleep(time.Second) - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - } - - assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) - - p.Destroy(context.Background()) -} - -func Test_SupervisedPoolReset(t *testing.T) { - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(log), - cfgSupervised, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - - w := p.Workers() - if len(w) == 0 { - t.Fatal("should be workers inside") - } - - pid := w[0].Pid() - require.NoError(t, p.Reset(context.Background())) - - w2 := p.Workers() - if len(w2) == 0 { - t.Fatal("should be workers inside") - } - - require.NotEqual(t, pid, w2[0].Pid()) -} - -// This test should finish without freezes -func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { - var cfgSupervised = cfgSupervised - cfgSupervised.Debug = true - - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, - pipe.NewPipeFactory(log), - cfgSupervised, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - time.Sleep(time.Second) - - for i := 0; i < 10; i++ { - time.Sleep(time.Second) - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - } - - p.Destroy(context.Background()) -} - -func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 1 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.Error(t, err) - assert.Empty(t, resp) - - time.Sleep(time.Second * 1) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) -} - -func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 5 * time.Second, - }, - } - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Equal(t, string(resp.Body), "hello world") - assert.Empty(t, resp.Context) - - time.Sleep(time.Second) - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) - pid = p.Workers()[0].Pid() - - resp, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Equal(t, string(resp.Body), "hello world") - assert.Empty(t, resp.Context) - - time.Sleep(time.Second) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) - - p.Destroy(context.Background()) -} - -func TestSupervisedPool_Idle(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 1 * time.Second, - ExecTTL: 100 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - pid := p.Workers()[0].Pid() - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 5) - - // worker should be marked as invalid and reallocated - rsp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - assert.NoError(t, err) - require.NotNil(t, rsp) - time.Sleep(time.Second * 2) - require.Len(t, p.Workers(), 1) - // should be new worker with new pid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - p.Destroy(context.Background()) -} - -func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 1 * time.Second, - IdleTTL: 1 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 2) - - if len(p.Workers()) < 1 { - t.Fatal("should be at least 1 worker") - return - } - // should be destroyed, state should be Ready, not Invalid - assert.NotEqual(t, pid, p.Workers()[0].Pid()) - assert.Equal(t, int64(1), p.Workers()[0].State().Value()) -} - -func TestSupervisedPool_ExecTTL_OK(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 4 * time.Second, - MaxWorkerMemory: 100, - }, - } - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - defer p.Destroy(context.Background()) - - pid := p.Workers()[0].Pid() - - time.Sleep(time.Millisecond * 100) - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second * 1) - // should be the same pid - assert.Equal(t, pid, p.Workers()[0].Pid()) -} - -func TestSupervisedPool_MaxMemoryReached(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(1), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 100 * time.Second, - IdleTTL: 100 * time.Second, - ExecTTL: 4 * time.Second, - MaxWorkerMemory: 1, - }, - } - - // constructed - // max memory - // constructed - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - assert.NotNil(t, p) - - resp, err := p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - assert.NoError(t, err) - assert.Empty(t, resp.Body) - assert.Empty(t, resp.Context) - - time.Sleep(time.Second) - p.Destroy(context.Background()) -} - -func TestSupervisedPool_AllocateFailedOK(t *testing.T) { - var cfgExecTTL = &Config{ - NumWorkers: uint64(2), - AllocateTimeout: time.Second * 15, - DestroyTimeout: time.Second * 5, - Supervisor: &SupervisorConfig{ - WatchTick: 1 * time.Second, - TTL: 5 * time.Second, - }, - } - - ctx := context.Background() - p, err := NewStaticPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, - pipe.NewPipeFactory(log), - cfgExecTTL, - ) - - assert.NoError(t, err) - require.NotNil(t, p) - - time.Sleep(time.Second) - - // should be ok - _, err = p.Exec(&payload.Payload{ - Context: []byte(""), - Body: []byte("foo"), - }) - - require.NoError(t, err) - - // after creating this file, PHP will fail - file, err := os.Create("break") - require.NoError(t, err) - - time.Sleep(time.Second * 5) - assert.NoError(t, file.Close()) - assert.NoError(t, os.Remove("break")) - - defer func() { - if r := recover(); r != nil { - assert.Fail(t, "panic should not be fired!") - } else { - p.Destroy(context.Background()) - } - }() -} |