diff options
author | Valery Piashchynski <[email protected]> | 2020-12-17 02:34:44 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-17 02:34:44 +0300 |
commit | 9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (patch) | |
tree | e49c46b03d8facc73e96f1b6247d83367cc65398 /pkg/pool | |
parent | 1033c25b6bfc752d6059e446510f651e22cbf49b (diff) |
huge refactor
Diffstat (limited to 'pkg/pool')
-rw-r--r-- | pkg/pool/config.go | 75 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 351 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 558 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 207 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 154 |
5 files changed, 1345 insertions, 0 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go new file mode 100644 index 00000000..3dcc3584 --- /dev/null +++ b/pkg/pool/config.go @@ -0,0 +1,75 @@ +package pool + +import ( + "runtime" + "time" +) + +// Configures the pool behaviour. +type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + + // NumWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. + NumWorkers int64 + + // MaxJobs defines how many executions is allowed for the worker until + // it's destruction. set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. + MaxJobs int64 + + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. Defaults to 60s. + AllocateTimeout time.Duration + + // DestroyTimeout defines for how long pool should be waiting for worker to + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. + DestroyTimeout time.Duration + + // Supervision config to limit worker and pool memory usage. + Supervisor *SupervisorConfig +} + +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { + if cfg.NumWorkers == 0 { + cfg.NumWorkers = int64(runtime.NumCPU()) + } + + if cfg.AllocateTimeout == 0 { + cfg.AllocateTimeout = time.Minute + } + + if cfg.DestroyTimeout == 0 { + cfg.DestroyTimeout = time.Minute + } + if cfg.Supervisor == nil { + return + } + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick uint64 + + // TTL defines maximum time worker is allowed to live. + TTL uint64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL uint64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL uint64 + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = 1 + } +} diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go new file mode 100755 index 00000000..220ea8e9 --- /dev/null +++ b/pkg/pool/static_pool.go @@ -0,0 +1,351 @@ +package pool + +import ( + "context" + "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + syncWorker "github.com/spiral/roadrunner/v2/pkg/worker" + workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" + "github.com/spiral/roadrunner/v2/util" +) + +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" + +var bCtx = context.Background() + +// ErrorEncoder encode error or make a decision based on the error type +type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error) + +// Before is set of functions that executes BEFORE Exec +type Before func(req internal.Payload) internal.Payload + +// After is set of functions that executes AFTER Exec +type After func(req internal.Payload, resp internal.Payload) internal.Payload + +type Options func(p *StaticPool) + +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. +type StaticPool struct { + cfg Config + + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to stack + factory worker.Factory + + // distributes the events + events worker.EventsHandler + + // manages worker states and TTLs + ww worker.Watcher + + // allocate new worker + allocator worker.Allocator + + errEncoder ErrorEncoder + before []Before + after []After +} + +// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) { + const op = errors.Op("NewPool") + if factory == nil { + return nil, errors.E(op, errors.Str("no factory initialized")) + } + cfg.InitDefaults() + + if cfg.Debug { + cfg.NumWorkers = 0 + cfg.MaxJobs = 1 + } + + p := &StaticPool{ + cfg: cfg, + cmd: cmd, + factory: factory, + events: util.NewEventsHandler(), + after: make([]After, 0, 0), + before: make([]Before, 0, 0), + } + + p.allocator = newPoolAllocator(factory, cmd) + p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + + workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + if err != nil { + return nil, errors.E(op, err) + } + + // put stack in the pool + err = p.ww.AddToWatch(workers) + if err != nil { + return nil, errors.E(op, err) + } + + p.errEncoder = defaultErrEncoder(p) + + // add pool options + for i := 0; i < len(options); i++ { + options[i](p) + } + + // if supervised config not nil, guess, that pool wanted to be supervised + if cfg.Supervisor != nil { + sp := newPoolWatcher(p, p.events, p.cfg.Supervisor) + // start watcher timer + sp.Start() + return sp, nil + } + + return p, nil +} + +func ExecBefore(before ...Before) Options { + return func(p *StaticPool) { + p.before = append(p.before, before...) + } +} + +func ExecAfter(after ...After) Options { + return func(p *StaticPool) { + p.after = append(p.after, after...) + } +} + +// AddListener connects event listener to the pool. +func (sp *StaticPool) AddListener(listener worker.EventListener) { + sp.events.AddListener(listener) +} + +// Config returns associated pool configuration. Immutable. +func (sp *StaticPool) GetConfig() interface{} { + return sp.cfg +} + +// Workers returns worker list associated with the pool. +func (sp *StaticPool) Workers() (workers []worker.BaseProcess) { + return sp.ww.WorkersList() +} + +func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error { + return sp.ww.RemoveWorker(wb) +} + +func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) { + const op = errors.Op("exec") + if sp.cfg.Debug { + return sp.execDebug(p) + } + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + + sw := w.(worker.SyncWorker) + + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + p = sp.before[i](p) + } + } + + rsp, err := sw.Exec(p) + if err != nil { + return sp.errEncoder(err, sw) + } + + // worker want's to be terminated + // TODO careful with string(rsp.Context) + if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { + sw.State().Set(internal.StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + } + + return sp.Exec(p) + } + + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) + } + + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](p, rsp) + } + } + + return rsp, nil +} + +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { + const op = errors.Op("exec with context") + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + + sw := w.(worker.SyncWorker) + + // apply all before function + if len(sp.before) > 0 { + for i := 0; i < len(sp.before); i++ { + rqs = sp.before[i](rqs) + } + } + + rsp, err := sw.ExecWithContext(ctx, rqs) + if err != nil { + return sp.errEncoder(err, sw) + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + sw.State().Set(internal.StateInvalid) + err = sw.Stop(bCtx) + if err != nil { + sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + } + + return sp.Exec(rqs) + } + + if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + } else { + sp.ww.PushWorker(sw) + } + + // apply all after functions + if len(sp.after) > 0 { + for i := 0; i < len(sp.after); i++ { + rsp = sp.after[i](rqs, rsp) + } + } + + return rsp, nil +} + +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + +// Destroy all underlying stack (but let them to complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.BaseProcess) (internal.Payload, error) { + const op = errors.Op("error encoder") + // soft job errors are allowed + if errors.Is(errors.ErrSoftJob, err) { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew() + if err != nil { + sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)}) + } + + w.State().Set(internal.StateInvalid) + err = w.Stop(bCtx) + if err != nil { + sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + } + } else { + sp.ww.PushWorker(w) + } + + return internal.Payload{}, errors.E(op, err) + } + + w.State().Set(internal.StateInvalid) + sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w}) + errS := w.Stop(bCtx) + + if errS != nil { + return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) + } + + return internal.Payload{}, errors.E(op, err) + } +} + +func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { + return func() (worker.BaseProcess, error) { + w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) + if err != nil { + return nil, err + } + + sw, err := syncWorker.From(w) + if err != nil { + return nil, err + } + return sw, nil + } +} + +func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) { + sw, err := sp.allocator() + if err != nil { + return internal.Payload{}, err + } + + r, err := sw.(worker.SyncWorker).Exec(p) + + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + +// allocate required number of stack +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { + const op = errors.Op("allocate workers") + var workers []worker.BaseProcess + + // constant number of stack simplify logic + for i := int64(0); i < numWorkers; i++ { + ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) + if err != nil { + cancel() + return nil, errors.E(op, errors.WorkerAllocate, err) + } + workers = append(workers, w) + cancel() + } + return workers, nil +} diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go new file mode 100755 index 00000000..8b13c7c9 --- /dev/null +++ b/pkg/pool/static_pool_test.go @@ -0,0 +1,558 @@ +package pool + +import ( + "context" + "log" + "os/exec" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/stretchr/testify/assert" +) + +var cfg = Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, +} + +func Test_NewPool(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) +} + +func Test_StaticPool_Invalid(t *testing.T) { + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, + pipe.NewPipeFactory(), + cfg, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_ConfigNoErrorInitDefaults(t *testing.T) { + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_StaticPool_Echo(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_NilContext(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_Context(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Empty(t, res.Body) + assert.NotNil(t, res.Context) + + assert.Equal(t, "world", string(res.Context)) +} + +func Test_StaticPool_JobError(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.ErrSoftJob, err) == false { + t.Fatal("error should be of type errors.Exec") + } + + assert.Contains(t, err.Error(), "hello") +} + +func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + assert.NotNil(t, p) + + block := make(chan struct{}) + + p.AddListener(func(event interface{}) { + if wev, ok := event.(worker.Event); ok { + if wev.Event == worker.EventWorkerLog { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + }) + + res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + + <-block + + p.Destroy(ctx) +} + +func Test_StaticPool_Broken_FromOutside(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) + assert.Equal(t, runtime.NumCPU(), len(p.Workers())) + + // Consume pool events + wg := sync.WaitGroup{} + wg.Add(1) + p.AddListener(func(event interface{}) { + if pe, ok := event.(pool.Event); ok { + if pe.Event == pool.EventWorkerConstruct { + wg.Done() + } + } + }) + + // killing random worker and expecting pool to replace it + err = p.Workers()[0].Kill() + if err != nil { + t.Errorf("error killing the process: error %v", err) + } + + wg.Wait() + + list := p.Workers() + for _, w := range list { + assert.Equal(t, internal.StateReady, w.State().Value()) + } + wg.Wait() +} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Nanosecond * 1, + DestroyTimeout: time.Second * 2, + }, + ) + assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") + } + assert.Nil(t, p) +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, + pipe.NewPipeFactory(), + Config{ + Debug: true, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + assert.Len(t, p.Workers(), 0) + + var lastPID string + res, _ := p.Exec(internal.Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) + + assert.Len(t, p.Workers(), 0) + + for i := 0; i < 10; i++ { + assert.Len(t, p.Workers(), 0) + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_StaticPool_Stop_Worker(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(internal.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + p.Destroy(ctx) + _, err = p.Exec(internal.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + go func() { + _, err := p.Exec(internal.Payload{Body: []byte("100")}) + if err != nil { + t.Errorf("error executing payload: error %v", err) + } + }() + time.Sleep(time.Millisecond * 10) + + p.Destroy(ctx) + _, err = p.Exec(internal.Payload{Body: []byte("100")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + for _, w := range p.Workers() { + w.State().Set(internal.StateErrored) + } + + _, err = p.Exec(internal.Payload{Body: []byte("hello")}) + assert.Error(t, err) +} + +// identical to replace but controlled on worker side +func Test_Static_Pool_Slow_Destroy(t *testing.T) { + p, err := NewPool( + context.Background(), + func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 5, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + p.Destroy(context.Background()) +} + +func Benchmark_Pool_Echo(b *testing.B) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + cfg, + ) + if err != nil { + b.Fatal(err) + } + + b.ResetTimer() + b.ReportAllocs() + for n := 0; n < b.N; n++ { + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +// +func Benchmark_Pool_Echo_Batched(b *testing.B) { + ctx := context.Background() + p, _ := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: int64(runtime.NumCPU()), + AllocateTimeout: time.Second * 100, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy(ctx) + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + }() + } + + wg.Wait() +} + +// +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + ctx := context.Background() + p, _ := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, + pipe.NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy(ctx) + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + } +} diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go new file mode 100755 index 00000000..0a2d16f7 --- /dev/null +++ b/pkg/pool/supervisor_pool.go @@ -0,0 +1,207 @@ +package pool + +import ( + "context" + "sync" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" +) + +const MB = 1024 * 1024 + +type Supervised interface { + pool.Pool + // Start used to start watching process for all pool workers + Start() +} + +type supervised struct { + cfg *SupervisorConfig + events worker.EventsHandler + pool pool.Pool + stopCh chan struct{} + mu *sync.RWMutex +} + +func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised { + sp := &supervised{ + cfg: cfg, + events: events, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), + } + return sp +} + +type ttlExec struct { + err error + p internal.Payload +} + +func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) { + const op = errors.Op("exec_supervised") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + defer cancel() + go func() { + res, err := sp.pool.ExecWithContext(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: internal.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return internal.Payload{}, res.err + } + + return res.p, nil + } + } +} + +func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) { + const op = errors.Op("supervised exec") + rsp, err := sp.pool.Exec(p) + if err != nil { + return internal.Payload{}, errors.E(op, err) + } + return rsp, nil +} + +func (sp *supervised) AddListener(listener worker.EventListener) { + sp.pool.AddListener(listener) +} + +func (sp *supervised) GetConfig() interface{} { + return sp.pool.GetConfig() +} + +func (sp *supervised) Workers() (workers []worker.BaseProcess) { + sp.mu.Lock() + defer sp.mu.Unlock() + return sp.pool.Workers() +} + +func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error { + return sp.pool.RemoveWorker(worker) +} + +func (sp *supervised) Destroy(ctx context.Context) { + sp.pool.Destroy(ctx) +} + +func (sp *supervised) Start() { + go func() { + watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.mu.Lock() + sp.control() + sp.mu.Unlock() + } + } + }() +} + +func (sp *supervised) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervised) control() { + now := time.Now() + const op = errors.Op("supervised pool control tick") + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == internal.StateInvalid { + continue + } + + s, err := roadrunner.WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]}) + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]}) + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != internal.StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // get last used unix nano + lu := workers[i].State().LastUsed() + + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + + // maxWorkerIdle more than diff between now and last used + if sp.cfg.IdleTTL-uint64(res) <= 0 { + err = sp.pool.RemoveWorker(workers[i]) + if err != nil { + sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)}) + return + } + sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]}) + } + } + } +} diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go new file mode 100644 index 00000000..2e3e7fd2 --- /dev/null +++ b/pkg/pool/supervisor_test.go @@ -0,0 +1,154 @@ +package pool + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/pipe" + "github.com/stretchr/testify/assert" +) + +var cfgSupervised = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 100, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + stopCh := make(chan struct{}) + defer p.Destroy(context.Background()) + + go func() { + for { + select { + case <-stopCh: + return + default: + workers := p.Workers() + if len(workers) > 0 { + s, err := roadrunner.WorkerProcessState(workers[0]) + assert.NoError(t, err) + assert.NotNil(t, s) + // since this is soft limit, double max memory limit watch + if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { + assert.Fail(t, "max memory reached") + } + } + } + } + }() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 50) + _, err = p.Exec(internal.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + stopCh <- struct{}{} +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 1, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), internal.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 100, + ExecTTL: 4, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(internal.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} |