summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rw-r--r--pool/config.go75
-rw-r--r--pool/interface.go53
-rwxr-xr-xpool/static_pool.go374
-rwxr-xr-xpool/static_pool_test.go721
-rwxr-xr-xpool/supervisor_pool.go230
-rw-r--r--pool/supervisor_test.go413
6 files changed, 1866 insertions, 0 deletions
diff --git a/pool/config.go b/pool/config.go
new file mode 100644
index 00000000..3a058956
--- /dev/null
+++ b/pool/config.go
@@ -0,0 +1,75 @@
+package pool
+
+import (
+ "runtime"
+ "time"
+)
+
+// Config .. Pool config Configures the pool behavior.
+type Config struct {
+ // Debug flag creates new fresh worker before every request.
+ Debug bool
+
+ // NumWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
+ NumWorkers uint64 `mapstructure:"num_workers"`
+
+ // MaxJobs defines how many executions is allowed for the worker until
+ // it's destruction. set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
+ MaxJobs uint64 `mapstructure:"max_jobs"`
+
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task. Defaults to 60s.
+ AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`
+
+ // DestroyTimeout defines for how long pool should be waiting for worker to
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
+ DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`
+
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor *SupervisorConfig `mapstructure:"supervisor"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
+ if cfg.NumWorkers == 0 {
+ cfg.NumWorkers = uint64(runtime.NumCPU())
+ }
+
+ if cfg.AllocateTimeout == 0 {
+ cfg.AllocateTimeout = time.Minute
+ }
+
+ if cfg.DestroyTimeout == 0 {
+ cfg.DestroyTimeout = time.Minute
+ }
+ if cfg.Supervisor == nil {
+ return
+ }
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration `mapstructure:"watch_tick"`
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL time.Duration `mapstructure:"ttl"`
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
+}
diff --git a/pool/interface.go b/pool/interface.go
new file mode 100644
index 00000000..d089092f
--- /dev/null
+++ b/pool/interface.go
@@ -0,0 +1,53 @@
+package pool
+
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/spiral/roadrunner/v2/worker"
+)
+
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // GetConfig returns pool configuration.
+ GetConfig() interface{}
+
+ // Exec executes task with payload
+ Exec(rqs *payload.Payload) (*payload.Payload, error)
+
+ // Workers returns worker list associated with the pool.
+ Workers() (workers []worker.BaseProcess)
+
+ // RemoveWorker removes worker from the pool.
+ RemoveWorker(worker worker.BaseProcess) error
+
+ // Destroy all underlying stack (but let them to complete the task).
+ Destroy(ctx context.Context)
+
+ // ExecWithContext executes task with context which is used with timeout
+ execWithTTL(ctx context.Context, rqs *payload.Payload) (*payload.Payload, error)
+}
+
+// Watcher is an interface for the Sync workers lifecycle
+type Watcher interface {
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
+
+ // Take takes the first free worker
+ Take(ctx context.Context) (worker.BaseProcess, error)
+
+ // Release releases the worker putting it back to the queue
+ Release(w worker.BaseProcess)
+
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
+
+ // Destroy destroys the underlying container
+ Destroy(ctx context.Context)
+
+ // List return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
+
+ // Remove will remove worker from the container
+ Remove(wb worker.BaseProcess)
+}
diff --git a/pool/static_pool.go b/pool/static_pool.go
new file mode 100755
index 00000000..25097395
--- /dev/null
+++ b/pool/static_pool.go
@@ -0,0 +1,374 @@
+package pool
+
+import (
+ "context"
+ "os/exec"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/spiral/roadrunner/v2/transport"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/spiral/roadrunner/v2/worker"
+ workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
+)
+
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
+
+// ErrorEncoder encode error or make a decision based on the error type
+type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)
+
+type Options func(p *StaticPool)
+
+type Command func() *exec.Cmd
+
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
+type StaticPool struct {
+ cfg *Config
+
+ // worker command creator
+ cmd Command
+
+ // creates and connects to stack
+ factory transport.Factory
+
+ // distributes the events
+ events events.Handler
+
+ // saved list of event listeners
+ listeners []events.Listener
+
+ // manages worker states and TTLs
+ ww Watcher
+
+ // allocate new worker
+ allocator worker.Allocator
+
+ // errEncoder is the default Exec error encoder
+ errEncoder ErrorEncoder
+}
+
+// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+ const op = errors.Op("static_pool_initialize")
+ if factory == nil {
+ return nil, errors.E(op, errors.Str("no factory initialized"))
+ }
+ cfg.InitDefaults()
+
+ if cfg.Debug {
+ cfg.NumWorkers = 0
+ cfg.MaxJobs = 1
+ }
+
+ p := &StaticPool{
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ events: events.NewEventsHandler(),
+ }
+
+ // add pool options
+ for i := 0; i < len(options); i++ {
+ options[i](p)
+ }
+
+ // set up workers allocator
+ p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
+ // set up workers watcher
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
+
+ // allocate requested number of workers
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // add workers to the watcher
+ err = p.ww.Watch(workers)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ p.errEncoder = defaultErrEncoder(p)
+
+ // if supervised config not nil, guess, that pool wanted to be supervised
+ if cfg.Supervisor != nil {
+ sp := supervisorWrapper(p, p.events, p.cfg.Supervisor)
+ // start watcher timer
+ sp.Start()
+ return sp, nil
+ }
+
+ return p, nil
+}
+
+func AddListeners(listeners ...events.Listener) Options {
+ return func(p *StaticPool) {
+ p.listeners = listeners
+ for i := 0; i < len(listeners); i++ {
+ p.addListener(listeners[i])
+ }
+ }
+}
+
+// AddListener connects event listener to the pool.
+func (sp *StaticPool) addListener(listener events.Listener) {
+ sp.events.AddListener(listener)
+}
+
+// GetConfig returns associated pool configuration. Immutable.
+func (sp *StaticPool) GetConfig() interface{} {
+ return sp.cfg
+}
+
+// Workers returns worker list associated with the pool.
+func (sp *StaticPool) Workers() (workers []worker.BaseProcess) {
+ return sp.ww.List()
+}
+
+func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
+ sp.ww.Remove(wb)
+ return nil
+}
+
+// Exec executes provided payload on the worker
+func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec")
+ if sp.cfg.Debug {
+ return sp.execDebug(p)
+ }
+ ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.takeWorker(ctxGetFree, op)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ rsp, err := w.(worker.SyncWorker).Exec(p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+ return sp.Exec(p)
+ }
+
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
+ }
+ // return worker back
+ sp.ww.Release(w)
+ return rsp, nil
+}
+
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_with_context")
+ if sp.cfg.Debug {
+ return sp.execDebugWithTTL(ctx, p)
+ }
+
+ ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
+ defer cancel()
+ w, err := sp.takeWorker(ctxAlloc, op)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
+ if err != nil {
+ return sp.errEncoder(err, w)
+ }
+
+ // worker want's to be terminated
+ if len(rsp.Body) == 0 && utils.AsString(rsp.Context) == StopRequest {
+ sp.stopWorker(w)
+ return sp.execWithTTL(ctx, p)
+ }
+
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
+ }
+
+ // return worker back
+ sp.ww.Release(w)
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
+ const op = errors.Op("static_pool_stop_worker")
+ w.State().Set(worker.StateInvalid)
+ err := w.Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ }
+}
+
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+//go:inline
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
+ if w.State().NumExecs() >= sp.cfg.MaxJobs {
+ w.State().Set(worker.StateMaxJobsReached)
+ sp.ww.Release(w)
+ return
+ }
+
+ sp.ww.Release(w)
+}
+
+func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+ // Get function consumes context with timeout
+ w, err := sp.ww.Take(ctxGetFree)
+ if err != nil {
+ // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
+ if errors.Is(errors.NoFreeWorkers, err) {
+ sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)})
+ return nil, errors.E(op, err)
+ }
+ // else if err not nil - return error
+ return nil, errors.E(op, err)
+ }
+ return w, nil
+}
+
+// Destroy all underlying stack (but let them complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.ww.Destroy(ctx)
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
+ const op = errors.Op("error_encoder")
+ // just push event if on any stage was timeout error
+ switch {
+ case errors.Is(errors.ExecTTL, err):
+ sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
+
+ case errors.Is(errors.SoftJob, err):
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
+ w.State().Set(worker.StateInvalid)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
+
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
+ }
+}
+
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+ return func() (worker.SyncWorker, error) {
+ ctxT, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
+ if err != nil {
+ return nil, err
+ }
+
+ // wrap sync worker
+ sw := worker.From(w)
+
+ sp.events.Push(events.PoolEvent{
+ Event: events.EventWorkerConstruct,
+ Payload: sw,
+ })
+ return sw, nil
+ }
+}
+
+// execDebug used when debug mode was not set and exec_ttl is 0
+func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_debug")
+ sw, err := sp.allocator()
+ if err != nil {
+ return nil, err
+ }
+
+ // redirect call to the workers' exec method (without ttl)
+ r, err := sw.Exec(p)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // destroy the worker
+ sw.State().Set(worker.StateDestroyed)
+ err = sw.Kill()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ return nil, errors.E(op, err)
+ }
+
+ return r, nil
+}
+
+// execDebugWithTTL used when user set debug mode and exec_ttl
+func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
+ sw, err := sp.allocator()
+ if err != nil {
+ return nil, err
+ }
+
+ // redirect call to the worker with TTL
+ r, err := sw.ExecWithTTL(ctx, p)
+ if stopErr := sw.Stop(); stopErr != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ }
+
+ return r, err
+}
+
+// allocate required number of stack
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
+ const op = errors.Op("static_pool_allocate_workers")
+ workers := make([]worker.BaseProcess, 0, numWorkers)
+
+ // constant number of stack simplify logic
+ for i := uint64(0); i < numWorkers; i++ {
+ w, err := sp.allocator()
+ if err != nil {
+ return nil, errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ workers = append(workers, w)
+ }
+ return workers, nil
+}
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
new file mode 100755
index 00000000..9861f0d8
--- /dev/null
+++ b/pool/static_pool_test.go
@@ -0,0 +1,721 @@
+package pool
+
+import (
+ "context"
+ "log"
+ "os/exec"
+ "runtime"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/spiral/roadrunner/v2/transport/pipe"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/spiral/roadrunner/v2/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+var cfg = &Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+}
+
+func Test_NewPool(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+}
+
+func Test_StaticPool_Invalid(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+
+ assert.Nil(t, p)
+ assert.Error(t, err)
+}
+
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+}
+
+func Test_StaticPool_Echo(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_NilContext(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: nil})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_StaticPool_Echo_Context(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello"), Context: []byte("world")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.Empty(t, res.Body)
+ assert.NotNil(t, res.Context)
+
+ assert.Equal(t, "world", string(res.Context))
+}
+
+func Test_StaticPool_JobError(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second * 2)
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ if errors.Is(errors.SoftJob, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
+
+ assert.Contains(t, err.Error(), "hello")
+ p.Destroy(ctx)
+}
+
+func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 10)
+
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerStderr {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ ctx := context.Background()
+ // Run pool events
+ ev := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
+ ev <- struct{}{}
+ }
+ }
+ }
+
+ var cfg2 = &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg2,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+ assert.Equal(t, 1, len(p.Workers()))
+
+ // first creation
+ <-ev
+ // killing random worker and expecting pool to replace it
+ err = p.Workers()[0].Kill()
+ if err != nil {
+ t.Errorf("error killing the process: error %v", err)
+ }
+
+ // re-creation
+ <-ev
+
+ list := p.Workers()
+ for _, w := range list {
+ assert.Equal(t, worker.StateReady, w.State().Value())
+ }
+}
+
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Nanosecond * 1,
+ DestroyTimeout: time.Second * 2,
+ },
+ )
+ assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
+ }
+ assert.Nil(t, p)
+}
+
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ defer p.Destroy(ctx)
+ // prevent process is not ready
+ time.Sleep(time.Second)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ Debug: true,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ defer p.Destroy(ctx)
+
+ // prevent process is not ready
+ time.Sleep(time.Second)
+ assert.Len(t, p.Workers(), 0)
+
+ var lastPID string
+ res, _ := p.Exec(&payload.Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
+
+ assert.Len(t, p.Workers(), 0)
+
+ for i := 0; i < 10; i++ {
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_StaticPool_Stop_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ defer p.Destroy(ctx)
+ time.Sleep(time.Second)
+
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
+
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Fatal(err)
+ }
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(&payload.Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Empty(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ go func() {
+ _, errP := p.Exec(&payload.Payload{Body: []byte("100")})
+ if errP != nil {
+ t.Errorf("error executing payload: error %v", err)
+ }
+ }()
+ time.Sleep(time.Millisecond * 100)
+
+ p.Destroy(ctx)
+ _, err = p.Exec(&payload.Payload{Body: []byte("100")})
+ assert.Error(t, err)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+ for i := range p.Workers() {
+ p.Workers()[i].State().Set(worker.StateErrored)
+ }
+
+ _, err = p.Exec(&payload.Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_Slow_Destroy(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy(context.Background())
+}
+
+func Test_StaticPool_NoFreeWorkers(t *testing.T) {
+ ctx := context.Background()
+ block := make(chan struct{}, 10)
+
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventNoFreeWorkers {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ p, err := Initialize(
+ ctx,
+ // sleep for the 3 seconds
+ func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ Debug: false,
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: nil,
+ },
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ go func() {
+ _, _ = p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
+ }()
+
+ time.Sleep(time.Second)
+ res, err := p.execWithTTL(ctx, &payload.Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+ assert.Nil(t, res)
+
+ <-block
+
+ p.Destroy(ctx)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand1(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+// identical to replace but controlled on worker side
+func Test_Static_Pool_WrongCommand2(t *testing.T) {
+ p, err := Initialize(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.Error(t, err)
+ assert.Nil(t, p)
+}
+
+/* PTR:
+Benchmark_Pool_Echo-32 49076 29926 ns/op 8016 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 47257 30779 ns/op 8047 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 46737 29440 ns/op 8065 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51177 29074 ns/op 7981 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 51764 28319 ns/op 8012 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54054 30714 ns/op 7987 B/op 20 allocs/op
+Benchmark_Pool_Echo-32 54391 30689 ns/op 8055 B/op 20 allocs/op
+
+VAL:
+Benchmark_Pool_Echo-32 47936 28679 ns/op 7942 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 49010 29830 ns/op 7970 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 46771 29031 ns/op 8014 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 47760 30517 ns/op 7955 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 48148 29816 ns/op 7950 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 52705 29809 ns/op 7979 B/op 19 allocs/op
+Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allocs/op
+*/
+func Benchmark_Pool_Echo(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ )
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(pld); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+// Benchmark_Pool_Echo_Batched-32 366996 2873 ns/op 1233 B/op 24 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 406839 2900 ns/op 1059 B/op 23 allocs/op
+// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
+func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: uint64(runtime.NumCPU()),
+ AllocateTimeout: time.Second * 100,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+
+ bd := make([]byte, 1024)
+ c := make([]byte, 1024)
+
+ pld := &payload.Payload{
+ Context: c,
+ Body: bd,
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ var wg sync.WaitGroup
+ for i := 0; i < b.N; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if _, err := p.Exec(pld); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }()
+ }
+
+ wg.Wait()
+}
+
+// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
+func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
+ pipe.NewPipeFactory(),
+ &Config{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(b, err)
+ defer p.Destroy(ctx)
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for n := 0; n < b.N; n++ {
+ if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ log.Println(err)
+ }
+ }
+}
+
+// BenchmarkToStringUnsafe-12 566317729 1.91 ns/op 0 B/op 0 allocs/op
+// BenchmarkToStringUnsafe-32 1000000000 0.4434 ns/op 0 B/op 0 allocs/op
+func BenchmarkToStringUnsafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := utils.AsString(testPayload)
+ _ = res
+ }
+}
+
+// BenchmarkToStringSafe-32 8017846 182.5 ns/op 896 B/op 1 allocs/op
+// inline BenchmarkToStringSafe-12 28926276 46.6 ns/op 128 B/op 1 allocs/op
+func BenchmarkToStringSafe(b *testing.B) {
+ testPayload := []byte("falsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtojfalsflasjlifjwpoihejfoiwejow{}{}{}{}jelfjasjfhwaopiehjtopwhtgohrgouahsgkljasdlfjasl;fjals;jdflkndgouwhetopwqhjtoj")
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ res := toStringNotFun(testPayload)
+ _ = res
+ }
+}
+
+func toStringNotFun(data []byte) string {
+ return string(data)
+}
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
new file mode 100755
index 00000000..99af168c
--- /dev/null
+++ b/pool/supervisor_pool.go
@@ -0,0 +1,230 @@
+package pool
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/spiral/roadrunner/v2/state/process"
+ "github.com/spiral/roadrunner/v2/worker"
+)
+
+const MB = 1024 * 1024
+
+// NSEC_IN_SEC nanoseconds in second
+const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck
+
+type Supervised interface {
+ Pool
+ // Start used to start watching process for all pool workers
+ Start()
+}
+
+type supervised struct {
+ cfg *SupervisorConfig
+ events events.Handler
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
+}
+
+func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised {
+ sp := &supervised{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
+ }
+
+ return sp
+}
+
+func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*payload.Payload, error) {
+ panic("used to satisfy pool interface")
+}
+
+func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("supervised_exec_with_context")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
+ defer cancel()
+
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return res, nil
+}
+
+func (sp *supervised) GetConfig() interface{} {
+ return sp.pool.GetConfig()
+}
+
+func (sp *supervised) Workers() (workers []worker.BaseProcess) {
+ sp.mu.Lock()
+ defer sp.mu.Unlock()
+ return sp.pool.Workers()
+}
+
+func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
+ return sp.pool.RemoveWorker(worker)
+}
+
+func (sp *supervised) Destroy(ctx context.Context) {
+ sp.pool.Destroy(ctx)
+}
+
+func (sp *supervised) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.mu.Lock()
+ sp.control()
+ sp.mu.Unlock()
+ }
+ }
+ }()
+}
+
+func (sp *supervised) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervised) control() { //nolint:gocognit
+ now := time.Now()
+
+ // MIGHT BE OUTDATED
+ // It's a copy of the Workers pointers
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ // if worker not in the Ready OR working state
+ // skip such worker
+ switch workers[i].State().Value() {
+ case
+ worker.StateInvalid,
+ worker.StateErrored,
+ worker.StateDestroyed,
+ worker.StateInactive,
+ worker.StateStopped,
+ worker.StateStopping,
+ worker.StateKilling:
+ continue
+ }
+
+ s, err := process.WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != worker.StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // 1610530005534416045 lu
+ // lu - now = -7811150814 - nanoseconds
+ // 7.8 seconds
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+ // worker not used, skip
+ if lu == 0 {
+ continue
+ }
+
+ // convert last used to unixNano and sub time.now to seconds
+ // negative number, because lu always in the past, except for the `back to the future` :)
+ res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
+
+ // maxWorkerIdle more than diff between now and last used
+ // for example:
+ // After exec worker goes to the rest
+ // And resting for the 5 seconds
+ // IdleTTL is 1 second.
+ // After the control check, res will be 5, idle is 1
+ // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Release
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+ // just to double-check
+ workers[i].State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
new file mode 100644
index 00000000..032e220b
--- /dev/null
+++ b/pool/supervisor_test.go
@@ -0,0 +1,413 @@
+package pool
+
+import (
+ "context"
+ "os"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/payload"
+ "github.com/spiral/roadrunner/v2/transport/pipe"
+ "github.com/spiral/roadrunner/v2/worker"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+var cfgSupervised = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+}
+
+func TestSupervisedPool_Exec(t *testing.T) {
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ pidBefore := p.Workers()[0].Pid()
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 100)
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ assert.NotEqual(t, pidBefore, p.Workers()[0].Pid())
+
+ p.Destroy(context.Background())
+}
+
+// This test should finish without freezes
+func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
+ var cfgSupervised = cfgSupervised
+ cfgSupervised.Debug = true
+
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") },
+ pipe.NewPipeFactory(),
+ cfgSupervised,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ for i := 0; i < 100; i++ {
+ time.Sleep(time.Millisecond * 500)
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ }
+
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Error(t, err)
+ assert.Empty(t, resp)
+
+ time.Sleep(time.Second * 1)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+ pid = p.Workers()[0].Pid()
+
+ resp, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_Idle(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Nil(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 5)
+
+ // worker should be marked as invalid and reallocated
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 1 * time.Second,
+ IdleTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 2)
+ // should be destroyed, state should be Ready, not Invalid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ assert.Equal(t, int64(1), p.Workers()[0].State().Value())
+}
+
+func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 1)
+ // should be the same pid
+ assert.Equal(t, pid, p.Workers()[0].Pid())
+}
+
+func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
+ MaxWorkerMemory: 1,
+ },
+ }
+
+ block := make(chan struct{}, 10)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.PoolEvent); ok {
+ if ev.Event == events.EventMaxMemory {
+ block <- struct{}{}
+ }
+ }
+ }
+
+ // constructed
+ // max memory
+ // constructed
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ AddListeners(listener),
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ resp, err := p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ <-block
+ p.Destroy(context.Background())
+}
+
+func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(2),
+ AllocateTimeout: time.Second * 15,
+ DestroyTimeout: time.Second * 5,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ require.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ // should be ok
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ require.NoError(t, err)
+
+ // after creating this file, PHP will fail
+ file, err := os.Create("break")
+ require.NoError(t, err)
+
+ time.Sleep(time.Second * 5)
+ assert.NoError(t, file.Close())
+ assert.NoError(t, os.Remove("break"))
+
+ defer func() {
+ if r := recover(); r != nil {
+ assert.Fail(t, "panic should not be fired!")
+ } else {
+ p.Destroy(context.Background())
+ }
+ }()
+}