diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 16:18:26 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-25 16:18:26 +0300 |
commit | f1875f5715bf7635e17697ae3513ba3d21e4e524 (patch) | |
tree | f9c0c3876ef542217a8bd7ff17f90bffc018132f /pkg/pool | |
parent | a063ad05b1cab8ec71eecc32f836efa4d431c6b8 (diff) | |
parent | 99bf203511b8af4be37186017e2e0c73a030d4f3 (diff) |
Merge pull request #429 from spiral/2.0
Release 2.0-dev
Diffstat (limited to 'pkg/pool')
-rw-r--r-- | pkg/pool/config.go | 75 | ||||
-rw-r--r-- | pkg/pool/interface.go | 29 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 327 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 647 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 222 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 248 |
6 files changed, 1548 insertions, 0 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go new file mode 100644 index 00000000..782f7ce9 --- /dev/null +++ b/pkg/pool/config.go @@ -0,0 +1,75 @@ +package pool + +import ( + "runtime" + "time" +) + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers uint64 `mapstructure:"num_workers"` + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs uint64 `mapstructure:"max_jobs"` + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration `mapstructure:"allocate_timeout"` + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration `mapstructure:"destroy_timeout"` + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig `mapstructure:"supervisor"` +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = uint64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick time.Duration `mapstructure:"watch_tick"` + + // TTL defines maximum time worker is allowed to live. + TTL time.Duration `mapstructure:"ttl"` + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL time.Duration `mapstructure:"idle_ttl"` + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration `mapstructure:"exec_ttl"` + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = time.Second + } +} diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go new file mode 100644 index 00000000..4f7ae595 --- /dev/null +++ b/pkg/pool/interface.go @@ -0,0 +1,29 @@ +package pool + +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +// Pool managed set of inner worker processes. +type Pool interface { + // GetConfig returns pool configuration. + GetConfig() interface{} + + // Exec executes task with payload + Exec(rqs payload.Payload) (payload.Payload, error) + + // ExecWithContext executes task with context which is used with timeout + ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) + + // Workers returns worker list associated with the pool. + Workers() (workers []worker.SyncWorker) + + // Remove worker from the pool. + RemoveWorker(worker worker.SyncWorker) error + + // Destroy all underlying stack (but let them to complete the task). + Destroy(ctx context.Context) +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go new file mode 100755 index 00000000..44adf9c0 --- /dev/null +++ b/pkg/pool/static_pool.go @@ -0,0 +1,327 @@ +package pool + +import ( + "context" + "os/exec" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport" + "github.com/spiral/roadrunner/v2/pkg/worker" + workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" +) + +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w worker.SyncWorker) (payload.Payload, error) + +type Options func(p *StaticPool) + +type Command func() *exec.Cmd + +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +type StaticPool struct { + cfg Config + + // worker command creator + cmd Command + + // creates and connects to stack + factory transport.Factory + + // distributes the events + events events.Handler + + // saved list of event listeners + listeners []events.Listener + + // manages worker states and TTLs + ww workerWatcher.Watcher + + // allocate new worker + allocator worker.Allocator + + // errEncoder is the default Exec error encoder + errEncoder ErrorEncoder +} + +// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error) { + const op = errors.Op("static_pool_initialize") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } + cfg.InitDefaults() + + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 + } + + p := &StaticPool{ + cfg: cfg, + cmd: cmd, + factory: factory, + events: events.NewEventsHandler(), + } + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + + p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + + workers, err := p.allocateWorkers(p.cfg.NumWorkers) + if err != nil { + return nil, errors.E(op, err) + } + + // put stack in the pool + err = p.ww.AddToWatch(workers) + if err != nil { + return nil, errors.E(op, err) + } + + p.errEncoder = defaultErrEncoder(p) + + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } + + return p, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *StaticPool) { + p.listeners = listeners + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// AddListener connects event listener to the pool. +func (sp *StaticPool) addListener(listener events.Listener) { + sp.events.AddListener(listener) +} + +// Config returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() interface{} { + return sp.cfg +} + +// Workers returns worker list associated with the pool. +func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { + return sp.ww.WorkersList() +} + +func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { + return sp.ww.RemoveWorker(wb) +} + +// Be careful, sync Exec with ExecWithContext +func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("static_pool_exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + rsp, err := w.Exec(p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + // TODO careful with string(rsp.Context) + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sp.stopWorker(w) + + return sp.Exec(p) + } + + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + return rsp, nil +} + +// Be careful, sync with pool.Exec method +func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("static_pool_exec_with_context") + ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + rsp, err := w.ExecWithTimeout(ctx, p) + if err != nil { + return sp.errEncoder(err, w) + } + + // worker want's to be terminated + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sp.stopWorker(w) + return sp.ExecWithContext(ctx, p) + } + + err = sp.checkMaxJobs(w) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + + return rsp, nil +} + +func (sp *StaticPool) stopWorker(w worker.SyncWorker) { + const op = errors.Op("static_pool_stop_worker") + w.State().Set(internal.StateInvalid) + err := w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } +} + +// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs +func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { + const op = errors.Op("static_pool_check_max_jobs") + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew() + if err != nil { + return errors.E(op, err) + } + } else { + sp.ww.PushWorker(w) + } + return nil +} + +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.SyncWorker) (payload.Payload, error) { + const op = errors.Op("error encoder") + // just push event if on any stage was timeout error + if errors.Is(errors.ExecTTL, err) { + sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + } + // soft job errors are allowed + if errors.Is(errors.SoftJob, err) { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) + } + + w.State().Set(internal.StateInvalid) + err = w.Stop() + if err != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } + } else { + sp.ww.PushWorker(w) + } + + return payload.Payload{}, errors.E(op, err) + } + + w.State().Set(internal.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + errS := w.Stop() + + if errS != nil { + return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + } + + return payload.Payload{}, errors.E(op, err) + } +} + +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (*worker.SyncWorkerImpl, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) + if err != nil { + return nil, err + } + + sw := worker.From(w) + + sp.events.Push(events.PoolEvent{ + Event: events.EventWorkerConstruct, + Payload: sw, + }) + return sw, nil + } +} + +func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { + sw, err := sp.allocator() + if err != nil { + return payload.Payload{}, err + } + + r, err := sw.Exec(p) + + if stopErr := sw.Stop(); stopErr != nil { + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + +// allocate required number of stack +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { + const op = errors.Op("allocate workers") + var workers []worker.SyncWorker + + // constant number of stack simplify logic + for i := uint64(0); i < numWorkers; i++ { + w, err := sp.allocator() + if err != nil { + return nil, errors.E(op, errors.WorkerAllocate, err) + } + + workers = append(workers, w) + } + return workers, nil +} diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go new file mode 100755 index 00000000..a32790e0 --- /dev/null +++ b/pkg/pool/static_pool_test.go @@ -0,0 +1,647 @@ +package pool + +import ( + "context" + "log" + "os/exec" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/stretchr/testify/assert" +) + +var cfg = Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, +} + +func Test_NewPool(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) +} + +func Test_StaticPool_Invalid(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, + pipe.NewPipeFactory(), + cfg, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_ConfigNoErrorInitDefaults(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_StaticPool_Echo(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_NilContext(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_Context(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello"), Context: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Empty(t, res.Body) + assert.NotNil(t, res.Context) + + assert.Equal(t, "world", string(res.Context)) +} + +func Test_StaticPool_JobError(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "hello") +} + +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 1) + + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerLog { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +func Test_StaticPool_Broken_FromOutside(t *testing.T) { + ctx := context.Background() + // Consume pool events + ev := make(chan struct{}, 1) + listener := func(event interface{}) { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { + ev <- struct{}{} + } + } + } + + var cfg = Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Equal(t, 1, len(p.Workers())) + + // first creation + <-ev + // killing random worker and expecting pool to replace it + err = p.Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } + + // re-creation + <-ev + + list := p.Workers() + for _, w := range list { + assert.Equal(t, internal.StateReady, w.State().Value()) + } +} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Nanosecond * 1, + DestroyTimeout: time.Second * 2, + }, + ) + assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") + } + assert.Nil(t, p) +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: true, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + assert.Len(t, p.Workers(), 0) + + var lastPID string + res, _ := p.Exec(payload.Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) + + assert.Len(t, p.Workers(), 0) + + for i := 0; i < 10; i++ { + assert.Len(t, p.Workers(), 0) + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_StaticPool_Stop_Worker(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + p.Destroy(ctx) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + go func() { + _, err := p.Exec(payload.Payload{Body: []byte("100")}) + if err != nil { + t.Errorf("error executing payload: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + p.Destroy(ctx) + _, err = p.Exec(payload.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + for i := range p.Workers() { + p.Workers()[i].State().Set(internal.StateErrored) + } + + _, err = p.Exec(payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Slow_Destroy(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + p.Destroy(context.Background()) +} + +func Test_StaticPool_NoFreeWorkers(t *testing.T) { + ctx := context.Background() + block := make(chan struct{}, 1) + + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventNoFreeWorkers { + block <- struct{}{} + } + } + } + + p, err := Initialize( + ctx, + // sleep for the 3 seconds + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: false, + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: nil, + }, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + go func() { + _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + }() + + time.Sleep(time.Second) + res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand1(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("phg", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_WrongCommand2(t *testing.T) { + p, err := Initialize( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Error(t, err) + assert.Nil(t, p) +} + +func Benchmark_Pool_Echo(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +// +func Benchmark_Pool_Echo_Batched(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + }() + } + + wg.Wait() +} + +// +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(b, err) + defer p.Destroy(ctx) + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + } +} diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go new file mode 100755 index 00000000..2597b352 --- /dev/null +++ b/pkg/pool/supervisor_pool.go @@ -0,0 +1,222 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/tools" +) + +const MB = 1024 * 1024 + +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + +type Supervised interface { + Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events events.Handler + pool Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + + return sp +} + +type ttlExec struct { + err error + p payload.Payload +} + +func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return payload.Payload{}, errors.E(op, err) + } + return rsp, nil +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.SyncWorker) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.SyncWorker) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { + now := time.Now() + const op = errors.Op("supervised_pool_control_tick") + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == internal.StateInvalid { + continue + } + + s, err := tools.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != internal.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds + // get last used unix nano + lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } + + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 + + // maxWorkerIdle more than diff between now and last used + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + } + } + } +} diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go new file mode 100644 index 00000000..c67d5d91 --- /dev/null +++ b/pkg/pool/supervisor_test.go @@ -0,0 +1,248 @@ +package pool + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/tools" + "github.com/stretchr/testify/assert" +) + +var cfgSupervised = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + stopCh := make(chan struct{}) + defer p.Destroy(context.Background()) + + go func() { + for { + select { + case <-stopCh: + return + default: + workers := p.Workers() + if len(workers) > 0 { + s, err := tools.WorkerProcessState(workers[0]) + assert.NoError(t, err) + assert.NotNil(t, s) + // since this is soft limit, double max memory limit watch + if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { + assert.Fail(t, "max memory reached") + } + } + } + } + }() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 50) + _, err = p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + stopCh <- struct{}{} +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + p.Destroy(context.Background()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_MaxMemoryReached(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 1, + }, + } + + block := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventMaxMemory { + block <- struct{}{} + } + } + } + + // constructed + // max memory + // constructed + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + AddListeners(listener), + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + <-block + p.Destroy(context.Background()) +} |