diff options
Diffstat (limited to 'pool')
-rwxr-xr-x | pool/static_pool.go | 61 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 84 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 17 | ||||
-rw-r--r-- | pool/supervisor_test.go | 29 |
4 files changed, 83 insertions, 108 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 7481f84f..019c34b2 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,7 +2,6 @@ package pool import ( "context" - "fmt" "os/exec" "time" @@ -13,13 +12,12 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" + "go.uber.org/zap" ) const ( // StopRequest can be sent by worker to indicate that restart is required. StopRequest = `{"stop":true}` - // pluginName ... - pluginName = "pool" ) // ErrorEncoder encode error or make a decision based on the error type @@ -32,6 +30,7 @@ type Command func() *exec.Cmd // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { cfg *Config + log *zap.Logger // worker command creator cmd Command @@ -39,9 +38,6 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - events events.EventBus - eventsID string - // manages worker states and TTLs ww Watcher @@ -52,8 +48,8 @@ type StaticPool struct { errEncoder ErrorEncoder } -// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { +// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -64,13 +60,10 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } - eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: eb, - eventsID: id, + cfg: cfg, + cmd: cmd, + factory: factory, } // add pool options @@ -78,10 +71,19 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg options[i](p) } + if p.log == nil { + z, err := zap.NewProduction() + if err != nil { + return nil, err + } + + p.log = z + } + // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -99,7 +101,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, eb, p.cfg.Supervisor) + sp := supervisorWrapper(p, p.log, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -108,6 +110,12 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } +func WithLogger(z *zap.Logger) Options { + return func(p *StaticPool) { + p.log = z + } +} + // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -158,7 +166,6 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { - sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -183,13 +190,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) + sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err)) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - + sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { // mark old as invalid and stop @@ -209,15 +215,14 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - + sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) // kill the worker instead of sending net packet to it _ = w.Kill() return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err)) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -268,7 +273,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid()))) + sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) } } @@ -289,7 +294,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err))) + sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err)) return nil, errors.E(op, err) } // else if err not nil - return error @@ -310,7 +315,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid()))) + sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String())) return sw, nil } } @@ -336,7 +341,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { // destroy the worker err = sw.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) return nil, err } @@ -363,7 +368,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) err = sw.Stop() if err != nil { - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid()))) + sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err)) return nil, err } diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index a45aa29d..4f98ca91 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -6,19 +6,18 @@ import ( "os/exec" "runtime" "strconv" - "strings" "sync" "testing" "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) var cfg = &Config{ @@ -29,7 +28,7 @@ var cfg = &Config{ func Test_NewPool(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -44,7 +43,7 @@ func Test_NewPool(t *testing.T) { func Test_NewPoolReset(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -71,7 +70,7 @@ func Test_NewPoolReset(t *testing.T) { } func Test_StaticPool_Invalid(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") }, pipe.NewPipeFactory(), @@ -83,7 +82,7 @@ func Test_StaticPool_Invalid(t *testing.T) { } func Test_ConfigNoErrorInitDefaults(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -100,7 +99,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { func Test_StaticPool_Echo(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -124,7 +123,7 @@ func Test_StaticPool_Echo(t *testing.T) { func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -148,7 +147,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { func Test_StaticPool_Echo_Context(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") }, pipe.NewPipeFactory(), @@ -172,7 +171,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { func Test_StaticPool_JobError(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") }, pipe.NewPipeFactory(), @@ -198,17 +197,15 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - eb, id := events.Bus() - defer eb.Unsubscribe(id) - ch := make(chan events.Event, 10) - err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + z, err := zap.NewProduction() require.NoError(t, err) - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, + WithLogger(z), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -218,31 +215,19 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - event := <-ch - if !strings.Contains(event.Message(), "undefined_function()") { - t.Fatal("event should contain undefiled function()") - } - p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() - // Run pool events - eb, id := events.Bus() - defer eb.Unsubscribe(id) - ch := make(chan events.Event, 10) - err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) - require.NoError(t, err) - var cfg2 = &Config{ NumWorkers: 1, AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, } - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -264,7 +249,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ch + time.Sleep(time.Second * 2) // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -272,8 +257,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ch - + time.Sleep(time.Second * 2) list := p.Workers() for _, w := range list { assert.Equal(t, worker.StateReady, w.State().Value()) @@ -281,7 +265,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } func Test_StaticPool_AllocateTimeout(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), @@ -300,7 +284,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { func Test_StaticPool_Replace_Worker(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), @@ -339,7 +323,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { func Test_StaticPool_Debug_Worker(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, pipe.NewPipeFactory(), @@ -381,7 +365,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") }, pipe.NewPipeFactory(), @@ -422,7 +406,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), @@ -444,7 +428,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, pipe.NewPipeFactory(), @@ -474,7 +458,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Handle_Dead(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -499,7 +483,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Slow_Destroy(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -519,13 +503,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - eb, id := events.Bus() - defer eb.Unsubscribe(id) - ch := make(chan events.Event, 10) - err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) - require.NoError(t, err) - - p, err := Initialize( + p, err := NewStaticPool( ctx, // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, @@ -550,14 +528,14 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-ch + time.Sleep(time.Second) p.Destroy(ctx) } // identical to replace but controlled on worker side func Test_Static_Pool_WrongCommand1(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -574,7 +552,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_WrongCommand2(t *testing.T) { - p, err := Initialize( + p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -591,7 +569,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { func Test_CRC_WithPayload(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, pipe.NewPipeFactory(), @@ -623,7 +601,7 @@ Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allo */ func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -655,7 +633,7 @@ func Benchmark_Pool_Echo(b *testing.B) { // PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -697,7 +675,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { // Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 0502dc9a..59834859 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,7 +2,6 @@ package pool import ( "context" - "fmt" "sync" "time" @@ -11,11 +10,11 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/state/process" "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" ) const ( - MB = 1024 * 1024 - supervisorName string = "supervisor" + MB = 1024 * 1024 ) // NSEC_IN_SEC nanoseconds in second @@ -29,17 +28,17 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig - events events.EventBus pool Pool + log *zap.Logger stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, log *zap.Logger, cfg *SupervisorConfig) *supervised { sp := &supervised{ cfg: cfg, - events: eb, pool: pool, + log: log, mu: &sync.RWMutex{}, stopCh: make(chan struct{}), } @@ -166,7 +165,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("ttl reached, worker's pid: %d", workers[i].Pid()))) + sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) continue } @@ -186,7 +185,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("max memory reached, worker's pid: %d", workers[i].Pid()))) + sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String())) continue } @@ -241,7 +240,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("idle ttl reached, worker's pid: %d", workers[i].Pid()))) + sp.log.Debug("idle_ttl", zap.String("reason", "idle ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String())) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 6e8ab552..6ff62316 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/worker" @@ -30,7 +29,7 @@ var cfgSupervised = &Config{ func TestSupervisedPool_Exec(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), @@ -60,7 +59,7 @@ func TestSupervisedPool_Exec(t *testing.T) { func Test_SupervisedPoolReset(t *testing.T) { ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), @@ -91,7 +90,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { cfgSupervised.Debug = true ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, pipe.NewPipeFactory(), @@ -129,7 +128,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { }, } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, pipe.NewPipeFactory(), @@ -164,7 +163,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { }, } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, pipe.NewPipeFactory(), @@ -221,7 +220,7 @@ func TestSupervisedPool_Idle(t *testing.T) { }, } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, pipe.NewPipeFactory(), @@ -271,7 +270,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { }, } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(), @@ -319,7 +318,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { }, } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(), @@ -361,17 +360,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - eb, id := events.Bus() - defer eb.Unsubscribe(id) - ch := make(chan events.Event, 10) - err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) - require.NoError(t, err) - // constructed // max memory // constructed ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), @@ -390,7 +383,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-ch + time.Sleep(time.Second) p.Destroy(context.Background()) } @@ -406,7 +399,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) { } ctx := context.Background() - p, err := Initialize( + p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, pipe.NewPipeFactory(), |