diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
commit | 52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch) | |
tree | 296d48dab1d5396313c061fbfc930a97af0df9f4 /pool | |
parent | 15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff) |
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool')
-rwxr-xr-x | pool/static_pool.go | 56 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 3 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 43 | ||||
-rw-r--r-- | pool/supervisor_test.go | 1 |
4 files changed, 28 insertions, 75 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 27db830c..11112e72 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -99,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.cfg.Supervisor) + sp := supervisorWrapper(p, eb, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -195,11 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()))) } } @@ -221,11 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Send(&events.RREvent{ - T: events.EventNoFreeWorkers, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err))) return nil, errors.E(op, err) } // else if err not nil - return error @@ -245,20 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Send(&events.RREvent{ - T: events.EventExecTTL, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err))) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -279,11 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -291,11 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -319,11 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerConstruct, - P: pluginName, - M: fmt.Sprintf("pid: %d", sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid()))) return sw, nil } } @@ -345,11 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) return nil, err } @@ -366,11 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index abef3779..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -170,6 +170,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -201,6 +202,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Run pool events eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) require.NoError(t, err) @@ -489,6 +491,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) require.NoError(t, err) diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index c1fb6eec..1a94f6a0 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -28,23 +28,20 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.EventBus - eventsID string - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { - eb, id := events.Bus() +func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { sp := &supervised{ - cfg: cfg, - events: eb, - eventsID: id, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -155,11 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -179,11 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventMaxMemory, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -238,11 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventIdleTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 9c0bfdaa..eb3c37dd 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -332,6 +332,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { } eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) require.NoError(t, err) |