diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:50:03 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-10-27 22:50:03 +0300 |
commit | c8c3f9f113eae13aa37cf92043b288bb0c68a622 (patch) | |
tree | 42f8ab386735d5f8b002907d07249e94b4c10a12 /pool | |
parent | 1f62e21020cc3014e9eb2dc33c154de6dd5b22d5 (diff) | |
parent | ab591e7f122e28857cef00c905a8125992ea3cdf (diff) |
[#838]: feat(events): events package deep refactoringv2.6.0-alpha.1
[#838]: feat(events): events package deep refactoring
Diffstat (limited to 'pool')
-rwxr-xr-x | pool/static_pool.go | 68 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 61 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 18 | ||||
-rw-r--r-- | pool/supervisor_test.go | 21 |
4 files changed, 75 insertions, 93 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 91bd1c2c..11112e72 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "os/exec" "time" @@ -14,8 +15,12 @@ import ( workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" ) -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = `{"stop":true}` + // pluginName ... + pluginName = "pool" +) // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) @@ -34,11 +39,8 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener + events events.EventBus + eventsID string // manages worker states and TTLs ww Watcher @@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } + eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), + cfg: cfg, + cmd: cmd, + factory: factory, + events: eb, + eventsID: id, } // add pool options @@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + sp := supervisorWrapper(p, eb, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -205,7 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()))) } } @@ -227,7 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err))) return nil, errors.E(op, err) } // else if err not nil - return error @@ -238,6 +228,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -246,12 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err))) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -272,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -280,7 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -296,7 +287,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) if err != nil { return nil, err } @@ -304,10 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, - }) + sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid()))) return sw, nil } } @@ -329,7 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) return nil, err } @@ -346,7 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,18 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +189,23 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +218,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +489,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +508,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +521,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..1a94f6a0 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -25,16 +29,16 @@ type Supervised interface { type supervised struct { cfg *SupervisorConfig - events events.Handler + events events.EventBus pool Pool stopCh chan struct{} mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { sp := &supervised{ cfg: cfg, - events: events, + events: eb, pool: pool, mu: &sync.RWMutex{}, stopCh: make(chan struct{}), @@ -148,7 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -168,7 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -223,7 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index aca379c6..eb3c37dd 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -265,6 +265,11 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { assert.Empty(t, resp.Context) time.Sleep(time.Second * 2) + + if len(p.Workers()) < 1 { + t.Fatal("should be at least 1 worker") + return + } // should be destroyed, state should be Ready, not Invalid assert.NotEqual(t, pid, p.Workers()[0].Pid()) assert.Equal(t, int64(1), p.Workers()[0].State().Value()) @@ -326,14 +331,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) + require.NoError(t, err) // constructed // max memory @@ -344,7 +346,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, - AddListeners(listener), ) assert.NoError(t, err) @@ -359,7 +360,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-block + <-ch p.Destroy(context.Background()) } |