summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
committerValery Piashchynski <[email protected]>2020-12-17 02:34:44 +0300
commit9d5fe4f6a98b30fd73be8259f84fa595ac994a71 (patch)
treee49c46b03d8facc73e96f1b6247d83367cc65398 /pkg/pool
parent1033c25b6bfc752d6059e446510f651e22cbf49b (diff)
huge refactor
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/config.go75
-rwxr-xr-xpkg/pool/static_pool.go351
-rwxr-xr-xpkg/pool/static_pool_test.go558
-rwxr-xr-xpkg/pool/supervisor_pool.go207
-rw-r--r--pkg/pool/supervisor_test.go154
5 files changed, 1345 insertions, 0 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
new file mode 100644
index 00000000..3dcc3584
--- /dev/null
+++ b/pkg/pool/config.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "runtime"
+ "time"
+)
+
+// Configures the pool behaviour.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers int64
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs int64
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = int64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick uint64
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL uint64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL uint64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL uint64
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = 1
+ }
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
new file mode 100755
index 00000000..220ea8e9
--- /dev/null
+++ b/pkg/pool/static_pool.go
@@ -0,0 +1,351 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
+ workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
+
+var bCtx = context.Background()
+
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w worker.BaseProcess) (internal.Payload, error)
+
+// Before is set of functions that executes BEFORE Exec
+type Before func(req internal.Payload) internal.Payload
+
+// After is set of functions that executes AFTER Exec
+type After func(req internal.Payload, resp internal.Payload) internal.Payload
+
+type Options func(p *StaticPool)
+
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
+type StaticPool struct {
+ cfg Config
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // creates and connects to stack
+ factory worker.Factory
+
+ // distributes the events
+ events worker.EventsHandler
+
+ // manages worker states and TTLs
+ ww worker.Watcher
+
+ // allocate new worker
+ allocator worker.Allocator
+
+ errEncoder ErrorEncoder
+ before []Before
+ after []After
+}
+
+// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, cfg Config, options ...Options) (pool.Pool, error) {
+ const op = errors.Op("NewPool")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
+ cfg.InitDefaults()
+
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
+ p := &StaticPool{
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: util.NewEventsHandler(),
+ after: make([]After, 0, 0),
+ before: make([]Before, 0, 0),
+ }
+
+ p.allocator = newPoolAllocator(factory, cmd)
+ p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+
+ workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // put stack in the pool
+ err = p.ww.AddToWatch(workers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.errEncoder = defaultErrEncoder(p)
+
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
+
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
+
+ return p, nil
+}
+
+func ExecBefore(before ...Before) Options {
+ return func(p *StaticPool) {
+ p.before = append(p.before, before...)
+ }
+}
+
+func ExecAfter(after ...After) Options {
+ return func(p *StaticPool) {
+ p.after = append(p.after, after...)
+ }
+}
+
+// AddListener connects event listener to the pool.
+func (sp *StaticPool) AddListener(listener worker.EventListener) {
+ sp.events.AddListener(listener)
+}
+
+// Config returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() interface{} {
+ return sp.cfg
+}
+
+// Workers returns worker list associated with the pool.
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+ return sp.ww.WorkersList()
+}
+
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+ return sp.ww.RemoveWorker(wb)
+}
+
+func (sp *StaticPool) Exec(p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("exec")
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+
+ sw := w.(worker.SyncWorker)
+
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ p = sp.before[i](p)
+ }
+ }
+
+ rsp, err := sw.Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, sw)
+ }
+
+ // worker want's to be terminated
+ // TODO careful with string(rsp.Context)
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sw.State().Set(internal.StateInvalid)
+ err = sw.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ }
+
+ return sp.Exec(p)
+ }
+
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(sw)
+ }
+
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](p, rsp)
+ }
+ }
+
+ return rsp, nil
+}
+
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("exec with context")
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.getWorker(ctxGetFree, op)
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+
+ sw := w.(worker.SyncWorker)
+
+ // apply all before function
+ if len(sp.before) > 0 {
+ for i := 0; i < len(sp.before); i++ {
+ rqs = sp.before[i](rqs)
+ }
+ }
+
+ rsp, err := sw.ExecWithContext(ctx, rqs)
+ if err != nil {
+ return sp.errEncoder(err, sw)
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ sw.State().Set(internal.StateInvalid)
+ err = sw.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ }
+
+ return sp.Exec(rqs)
+ }
+
+ if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+ } else {
+ sp.ww.PushWorker(sw)
+ }
+
+ // apply all after functions
+ if len(sp.after) > 0 {
+ for i := 0; i < len(sp.after); i++ {
+ rsp = sp.after[i](rqs, rsp)
+ }
+ }
+
+ return rsp, nil
+}
+
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+ // GetFreeWorker function consumes context with timeout
+ w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(pool.Event{Event: pool.EventNoFreeWorkers, Payload: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
+// Destroy all underlying stack (but let them to complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.BaseProcess) (internal.Payload, error) {
+ const op = errors.Op("error encoder")
+ // soft job errors are allowed
+ if errors.Is(errors.ErrSoftJob, err) {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew()
+ if err != nil {
+ sp.events.Push(pool.Event{Event: pool.EventWorkerConstruct, Payload: errors.E(op, err)})
+ }
+
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+ } else {
+ sp.ww.PushWorker(w)
+ }
+
+ return internal.Payload{}, errors.E(op, err)
+ }
+
+ w.State().Set(internal.StateInvalid)
+ sp.events.Push(pool.Event{Event: pool.EventWorkerDestruct, Payload: w})
+ errS := w.Stop(bCtx)
+
+ if errS != nil {
+ return internal.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
+ }
+
+ return internal.Payload{}, errors.E(op, err)
+ }
+}
+
+func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (worker.BaseProcess, error) {
+ w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ if err != nil {
+ return nil, err
+ }
+
+ sw, err := syncWorker.From(w)
+ if err != nil {
+ return nil, err
+ }
+ return sw, nil
+ }
+}
+
+func (sp *StaticPool) execDebug(p internal.Payload) (internal.Payload, error) {
+ sw, err := sp.allocator()
+ if err != nil {
+ return internal.Payload{}, err
+ }
+
+ r, err := sw.(worker.SyncWorker).Exec(p)
+
+ if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ sp.events.Push(worker.Event{Event: worker.EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
+// allocate required number of stack
+func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+ const op = errors.Op("allocate workers")
+ var workers []worker.BaseProcess
+
+ // constant number of stack simplify logic
+ for i := int64(0); i < numWorkers; i++ {
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ if err != nil {
+ cancel()
+ return nil, errors.E(op, errors.WorkerAllocate, err)
+ }
+ workers = append(workers, w)
+ cancel()
+ }
+ return workers, nil
+}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
new file mode 100755
index 00000000..8b13c7c9
--- /dev/null
+++ b/pkg/pool/static_pool_test.go
@@ -0,0 +1,558 @@
+package pool
+
+import (
+ "context"
+ "log"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfg = Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+}
+
+func Test_NewPool(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+}
+
+func Test_StaticPool_Invalid(t *testing.T) {
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+
+ assert.Nil(t, p)
+ assert.Error(t, err)
+}
+
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+}
+
+func Test_StaticPool_Echo(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_NilContext(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: nil})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_Context(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello"), Context: []byte("world")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.Empty(t, res.Body)
+ assert.NotNil(t, res.Context)
+
+ assert.Equal(t, "world", string(res.Context))
+}
+
+func Test_StaticPool_JobError(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "hello")
+}
+
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ block := make(chan struct{})
+
+ p.AddListener(func(event interface{}) {
+ if wev, ok := event.(worker.Event); ok {
+ if wev.Event == worker.EventWorkerLog {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ })
+
+ res, err := p.ExecWithContext(ctx, internal.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
+
+ // Consume pool events
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ p.AddListener(func(event interface{}) {
+ if pe, ok := event.(pool.Event); ok {
+ if pe.Event == pool.EventWorkerConstruct {
+ wg.Done()
+ }
+ }
+ })
+
+ // killing random worker and expecting pool to replace it
+ err = p.Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
+
+ wg.Wait()
+
+ list := p.Workers()
+ for _, w := range list {
+ assert.Equal(t, internal.StateReady, w.State().Value())
+ }
+ wg.Wait()
+}
+
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Nanosecond * 1,
+ DestroyTimeout: time.Second * 2,
+ },
+ )
+ assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
+ }
+ assert.Nil(t, p)
+}
+
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ Debug: true,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ assert.Len(t, p.Workers(), 0)
+
+ var lastPID string
+ res, _ := p.Exec(internal.Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
+
+ assert.Len(t, p.Workers(), 0)
+
+ for i := 0; i < 10; i++ {
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_StaticPool_Stop_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(internal.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ go func() {
+ _, err := p.Exec(internal.Payload{Body: []byte("100")})
+ if err != nil {
+ t.Errorf("error executing payload: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 10)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(internal.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ for _, w := range p.Workers() {
+ w.State().Set(internal.StateErrored)
+ }
+
+ _, err = p.Exec(internal.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Slow_Destroy(t *testing.T) {
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy(context.Background())
+}
+
+func Benchmark_Pool_Echo(b *testing.B) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+//
+func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ ctx := context.Background()
+ p, _ := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
+ )
+ defer p.Destroy(ctx)
+
+ var wg sync.WaitGroup
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+//
+func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ ctx := context.Background()
+ p, _ := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ defer p.Destroy(ctx)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(internal.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
new file mode 100755
index 00000000..0a2d16f7
--- /dev/null
+++ b/pkg/pool/supervisor_pool.go
@@ -0,0 +1,207 @@
+package pool
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/internal"
+)
+
+const MB = 1024 * 1024
+
+type Supervised interface {
+ pool.Pool
+ // Start used to start watching process for all pool workers
+ Start()
+}
+
+type supervised struct {
+ cfg *SupervisorConfig
+ events worker.EventsHandler
+ pool pool.Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
+}
+
+func newPoolWatcher(pool pool.Pool, events worker.EventsHandler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
+ }
+ return sp
+}
+
+type ttlExec struct {
+ err error
+ p internal.Payload
+}
+
+func (sp *supervised) ExecWithContext(ctx context.Context, rqs internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("exec_supervised")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ defer cancel()
+ go func() {
+ res, err := sp.pool.ExecWithContext(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: internal.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return internal.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return internal.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
+ }
+}
+
+func (sp *supervised) Exec(p internal.Payload) (internal.Payload, error) {
+ const op = errors.Op("supervised exec")
+ rsp, err := sp.pool.Exec(p)
+ if err != nil {
+ return internal.Payload{}, errors.E(op, err)
+ }
+ return rsp, nil
+}
+
+func (sp *supervised) AddListener(listener worker.EventListener) {
+ sp.pool.AddListener(listener)
+}
+
+func (sp *supervised) GetConfig() interface{} {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+ return sp.pool.RemoveWorker(worker)
+}
+
+func (sp *supervised) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
+}
+
+func (sp *supervised) Start() {
+ go func() {
+ watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.mu.Lock()
+ sp.control()
+ sp.mu.Unlock()
+ }
+ }
+ }()
+}
+
+func (sp *supervised) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervised) control() {
+ now := time.Now()
+ const op = errors.Op("supervised pool control tick")
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == internal.StateInvalid {
+ continue
+ }
+
+ s, err := roadrunner.WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(pool.Event{Event: pool.EventTTL, Payload: workers[i]})
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(pool.Event{Event: pool.EventMaxMemory, Payload: workers[i]})
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != internal.StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+
+ // maxWorkerIdle more than diff between now and last used
+ if sp.cfg.IdleTTL-uint64(res) <= 0 {
+ err = sp.pool.RemoveWorker(workers[i])
+ if err != nil {
+ sp.events.Push(pool.Event{Event: pool.EventSupervisorError, Payload: errors.E(op, err)})
+ return
+ }
+ sp.events.Push(pool.Event{Event: pool.EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
new file mode 100644
index 00000000..2e3e7fd2
--- /dev/null
+++ b/pkg/pool/supervisor_test.go
@@ -0,0 +1,154 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfgSupervised = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 100,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ stopCh := make(chan struct{})
+ defer p.Destroy(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-stopCh:
+ return
+ default:
+ workers := p.Workers()
+ if len(workers) > 0 {
+ s, err := roadrunner.WorkerProcessState(workers[0])
+ assert.NoError(t, err)
+ assert.NotNil(t, s)
+ // since this is soft limit, double max memory limit watch
+ if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
+ assert.Fail(t, "max memory reached")
+ }
+ }
+ }
+ }
+ }()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 50)
+ _, err = p.Exec(internal.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ stopCh <- struct{}{}
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 1,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), internal.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp)
+
+ time.Sleep(time.Second * 1)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 100,
+ ExecTTL: 4,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(internal.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be the same pid
+ assert.Equal(t, pid, p.Workers()[0].Pid())
+}