diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
commit | 52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch) | |
tree | 296d48dab1d5396313c061fbfc930a97af0df9f4 | |
parent | 15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff) |
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | events/events.go | 143 | ||||
-rw-r--r-- | events/events_test.go | 6 | ||||
-rw-r--r-- | events/types.go | 165 | ||||
-rwxr-xr-x | pool/static_pool.go | 56 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 3 | ||||
-rwxr-xr-x | pool/supervisor_pool.go | 43 | ||||
-rw-r--r-- | pool/supervisor_test.go | 1 | ||||
-rw-r--r-- | transport/pipe/pipe_factory_spawn_test.go | 2 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 2 | ||||
-rw-r--r-- | transport/socket/socket_factory_spawn_test.go | 4 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 4 | ||||
-rwxr-xr-x | worker/worker.go | 10 | ||||
-rwxr-xr-x | worker_watcher/worker_watcher.go | 30 |
13 files changed, 204 insertions, 265 deletions
diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..b7396653 --- /dev/null +++ b/events/events.go @@ -0,0 +1,143 @@ +package events + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/events_test.go b/events/events_test.go index 6d6e90a2..e15c55d6 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -15,7 +15,7 @@ func TestEvenHandler(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) @@ -37,7 +37,7 @@ func TestEvenHandler2(t *testing.T) { err = eh.SubscribeP(id, "http.EventJobOK", ch2) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch2 require.Equal(t, "foo", evt.Message()) @@ -85,7 +85,7 @@ func TestEvenHandler5(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) diff --git a/events/types.go b/events/types.go index d8e40084..65a76d15 100644 --- a/events/types.go +++ b/events/types.go @@ -17,173 +17,30 @@ type Event interface { type RREvent struct { // event typ - T EventType + typ EventType // plugin - P string + plugin string // message - M string + message string } -// NewRREvent initializes new event -func NewRREvent(t EventType, msg string, plugin string) *RREvent { - // get +// NewEvent initializes new event +func NewEvent(t EventType, plugin string, msg string) *RREvent { return &RREvent{ - T: t, - P: plugin, - M: msg, + typ: t, + plugin: plugin, + message: msg, } } func (r *RREvent) Type() EventType { - return r.T + return r.typ } func (r *RREvent) Message() string { - return r.M + return r.message } func (r *RREvent) Plugin() string { - return r.P -} - -type EventType uint32 - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk EventType = iota - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr - - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady - - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart - - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -func (et EventType) String() string { - switch et { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - - default: - return "UnknownEventType" - } + return r.plugin } diff --git a/pool/static_pool.go b/pool/static_pool.go index 27db830c..11112e72 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -99,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // if supervised config not nil, guess, that pool wanted to be supervised if cfg.Supervisor != nil { - sp := supervisorWrapper(p, p.cfg.Supervisor) + sp := supervisorWrapper(p, eb, p.cfg.Supervisor) // start watcher timer sp.Start() return sp, nil @@ -195,11 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()))) } } @@ -221,11 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { - sp.events.Send(&events.RREvent{ - T: events.EventNoFreeWorkers, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err))) return nil, errors.E(op, err) } // else if err not nil - return error @@ -245,20 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): - sp.events.Send(&events.RREvent{ - T: events.EventExecTTL, - P: pluginName, - M: fmt.Sprintf("error: %s", err), - }) + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err))) w.State().Set(worker.StateInvalid) return nil, err case errors.Is(errors.SoftJob, err): - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // if max jobs exceed if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { @@ -279,11 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { case errors.Is(errors.Network, err): // in case of network error, we can't stop the worker, we should kill it w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // kill the worker instead of sending net packet to it _ = w.Kill() @@ -291,11 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return nil, err default: w.State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid()))) // stop the worker, worker here might be in the broken state (network) errS := w.Stop() if errS != nil { @@ -319,11 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // wrap sync worker sw := worker.From(w) - sp.events.Send(&events.RREvent{ - T: events.EventWorkerConstruct, - P: pluginName, - M: fmt.Sprintf("pid: %d", sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid()))) return sw, nil } } @@ -345,11 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { sw.State().Set(worker.StateDestroyed) err = sw.Kill() if err != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) return nil, err } @@ -366,11 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // redirect call to the worker with TTL r, err := sw.ExecWithTTL(ctx, p) if stopErr := sw.Stop(); stopErr != nil { - sp.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: pluginName, - M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()), - }) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()))) } return r, err diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index abef3779..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -170,6 +170,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -201,6 +202,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Run pool events eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) require.NoError(t, err) @@ -489,6 +491,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) require.NoError(t, err) diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go index c1fb6eec..1a94f6a0 100755 --- a/pool/supervisor_pool.go +++ b/pool/supervisor_pool.go @@ -28,23 +28,20 @@ type Supervised interface { } type supervised struct { - cfg *SupervisorConfig - events events.EventBus - eventsID string - pool Pool - stopCh chan struct{} - mu *sync.RWMutex + cfg *SupervisorConfig + events events.EventBus + pool Pool + stopCh chan struct{} + mu *sync.RWMutex } -func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised { - eb, id := events.Bus() +func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised { sp := &supervised{ - cfg: cfg, - events: eb, - eventsID: id, - pool: pool, - mu: &sync.RWMutex{}, - stopCh: make(chan struct{}), + cfg: cfg, + events: eb, + pool: pool, + mu: &sync.RWMutex{}, + stopCh: make(chan struct{}), } return sp @@ -155,11 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -179,11 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventMaxMemory, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) continue } @@ -238,11 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit } // just to double-check workers[i].State().Set(worker.StateInvalid) - sp.events.Send(&events.RREvent{ - T: events.EventIdleTTL, - P: supervisorName, - M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()), - }) + sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid()))) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 9c0bfdaa..eb3c37dd 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -332,6 +332,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { } eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch) require.NoError(t, err) diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 81004027..256176de 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -108,6 +108,7 @@ func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -370,6 +371,7 @@ func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index 8c6d440a..0f527cd5 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -127,6 +127,7 @@ func Test_Pipe_Failboot(t *testing.T) { ctx := context.Background() eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -434,6 +435,7 @@ func Test_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 45fb3bd5..2db2fd40 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -112,6 +112,7 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -162,6 +163,7 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -271,6 +273,7 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -327,6 +330,7 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index 11b34999..7b28a847 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -126,6 +126,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -203,6 +204,7 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -366,6 +368,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) @@ -440,6 +443,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") eb, id := events.Bus() + defer eb.Unsubscribe(id) ch := make(chan events.Event, 10) err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) require.NoError(t, err) diff --git a/worker/worker.go b/worker/worker.go index 5973adc6..05c6dd0d 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -135,6 +135,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") var err error err = w.cmd.Wait() + defer w.events.Unsubscribe(w.eventsID) // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -162,8 +163,6 @@ func (w *Process) Wait() error { return nil } - w.events.Unsubscribe(w.eventsID) - return err } @@ -221,11 +220,6 @@ func (w *Process) Kill() error { // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Send(&events.RREvent{ - T: events.EventWorkerStderr, - P: workerEventsName, - M: utils.AsString(p), - }) - + w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p))) return len(p), nil } diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go index 871e6146..d425994e 100755 --- a/worker_watcher/worker_watcher.go +++ b/worker_watcher/worker_watcher.go @@ -148,11 +148,7 @@ func (ww *workerWatcher) Allocate() error { sw, err := ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err))) // if no timeout, return error immediately if ww.allocateTimeout == 0 { @@ -176,11 +172,7 @@ func (ww *workerWatcher) Allocate() error { sw, err = ww.allocator() if err != nil { // log incident - ww.events.Send(&events.RREvent{ - T: events.EventWorkerError, - P: wwName, - M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err))) continue } @@ -287,11 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerWaitExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err))) } // remove worker @@ -299,11 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace - ww.events.Send(&events.RREvent{ - T: events.EventWorkerDestruct, - P: wwName, - M: fmt.Sprintf("pid: %d", w.Pid()), - }) + ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid()))) return } @@ -313,11 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { err = ww.Allocate() if err != nil { - ww.events.Send(&events.RREvent{ - T: events.EventWorkerProcessExit, - P: wwName, - M: fmt.Sprintf("error: %v", err), - }) + ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err))) // no workers at all, panic if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { |