diff options
author | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
commit | 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch) | |
tree | 8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /pool | |
parent | 160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff) |
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool')
-rwxr-xr-x | pool/static_pool.go | 102 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 58 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 49 | ||||
-rw-r--r-- | pool/supervisor_test.go | 15 |
4 files changed, 124 insertions, 100 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 91bd1c2c..27db830c 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "os/exec" "time" @@ -14,8 +15,12 @@ import ( workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" ) -// StopRequest can be sent by worker to indicate that restart is required. -const StopRequest = "{\"stop\":true}" +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = `{"stop":true}` + // pluginName ... + pluginName = "pool" +) // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error) @@ -34,11 +39,8 @@ type StaticPool struct { // creates and connects to stack factory transport.Factory - // distributes the events - events events.Handler - - // saved list of event listeners - listeners []events.Listener + events events.EventBus + eventsID string // manages worker states and TTLs ww Watcher @@ -62,11 +64,13 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg cfg.MaxJobs = 1 } + eb, id := events.Bus() p := &StaticPool{ - cfg: cfg, - cmd: cmd, - factory: factory, - events: events.NewEventsHandler(), + cfg: cfg, + cmd: cmd, + factory: factory, + events: eb, + eventsID: id, } // add pool options @@ -77,7 +81,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) @@ -95,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.events, p.cfg.Supervisor) + sp := supervisorWrapper(p, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -104,20 +108,6 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg return p, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *StaticPool) { - p.listeners = listeners - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - -// AddListener connects event listener to the pool. -func (sp *StaticPool) addListener(listener events.Listener) { - sp.events.AddListener(listener) -} - // GetConfig returns associated pool configuration. Immutable. func (sp *StaticPool) GetConfig() interface{} { return sp.cfg @@ -205,7 +195,11 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), + }) } } @@ -227,7 +221,11 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Push(events.PoolEvent{Event: events.EventNoFreeWorkers, Error: errors.E(op, err)}) + sp.events.Send(&events.RREvent{ + T: events.EventNoFreeWorkers, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) return nil, errors.E(op, err) } // else if err not nil - return error @@ -238,6 +236,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work // Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) sp.ww.Destroy(ctx) } @@ -246,12 +245,20 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Error: err}) + sp.events.Send(&events.RREvent{ + T: events.EventExecTTL, + P: pluginName, + M: fmt.Sprintf("error: %s", err), + }) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -272,7 +279,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -280,7 +291,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerDestruct, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), + }) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -296,7 +311,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...) + w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd()) if err != nil { return nil, err } @@ -304,9 +319,10 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Push(events.PoolEvent{ - Event: events.EventWorkerConstruct, - Payload: sw, + sp.events.Send(&events.RREvent{ + T: events.EventWorkerConstruct, + P: pluginName, + M: fmt.Sprintf("pid: %d", sw.Pid()), }) return sw, nil } @@ -329,7 +345,11 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) return nil, err } @@ -346,7 +366,11 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + sp.events.Send(&events.RREvent{ + T: events.EventWorkerError, + P: pluginName, + M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), + }) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..abef3779 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,17 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +188,22 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +216,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +233,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +241,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +487,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +505,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +518,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index 99af168c..c1fb6eec 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "sync" "time" @@ -12,7 +13,10 @@ import ( "github.com/spiral/roadrunner/v2/worker" ) -const MB = 1024 * 1024 +const ( + MB = 1024 * 1024 + supervisorName string = "supervisor" +) // NSEC_IN_SEC nanoseconds in second const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck @@ -24,20 +28,23 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.Handler - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + eventsID string + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) Supervised { +func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { + eb, id := events.Bus() sp := &supervised{ - cfg: cfg, - events: events, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + eventsID: id, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -148,7 +155,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -168,7 +179,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventMaxMemory, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) continue } @@ -223,7 +238,11 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) + sp.events.Send(&events.RREvent{ + T: events.EventIdleTTL, + P: supervisorName, + M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), + }) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index aca379c6..c3abf85e 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -326,14 +326,10 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { }, } - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventMaxMemory { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) + require.NoError(t, err) // constructed // max memory @@ -344,7 +340,6 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, - AddListeners(listener), ) assert.NoError(t, err) @@ -359,7 +354,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { assert.Empty(t, resp.Body) assert.Empty(t, resp.Context) - <-block + <-ch p.Destroy(context.Background()) } |