summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
committerValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
commit52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch)
tree296d48dab1d5396313c061fbfc930a97af0df9f4
parent15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff)
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--events/events.go143
-rw-r--r--events/events_test.go6
-rw-r--r--events/types.go165
-rwxr-xr-xpool/static_pool.go56
-rwxr-xr-xpool/static_pool_test.go3
-rwxr-xr-xpool/supervisor_pool.go43
-rw-r--r--pool/supervisor_test.go1
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go2
-rwxr-xr-xtransport/pipe/pipe_factory_test.go2
-rw-r--r--transport/socket/socket_factory_spawn_test.go4
-rwxr-xr-xtransport/socket/socket_factory_test.go4
-rwxr-xr-xworker/worker.go10
-rwxr-xr-xworker_watcher/worker_watcher.go30
13 files changed, 204 insertions, 265 deletions
diff --git a/events/events.go b/events/events.go
new file mode 100644
index 00000000..b7396653
--- /dev/null
+++ b/events/events.go
@@ -0,0 +1,143 @@
+package events
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/events_test.go b/events/events_test.go
index 6d6e90a2..e15c55d6 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -15,7 +15,7 @@ func TestEvenHandler(t *testing.T) {
err := eh.SubscribeP(id, "http.EventJobOK", ch)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
@@ -37,7 +37,7 @@ func TestEvenHandler2(t *testing.T) {
err = eh.SubscribeP(id, "http.EventJobOK", ch2)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch2
require.Equal(t, "foo", evt.Message())
@@ -85,7 +85,7 @@ func TestEvenHandler5(t *testing.T) {
err := eh.SubscribeP(id, "http.EventJobOK", ch)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
diff --git a/events/types.go b/events/types.go
index d8e40084..65a76d15 100644
--- a/events/types.go
+++ b/events/types.go
@@ -17,173 +17,30 @@ type Event interface {
type RREvent struct {
// event typ
- T EventType
+ typ EventType
// plugin
- P string
+ plugin string
// message
- M string
+ message string
}
-// NewRREvent initializes new event
-func NewRREvent(t EventType, msg string, plugin string) *RREvent {
- // get
+// NewEvent initializes new event
+func NewEvent(t EventType, plugin string, msg string) *RREvent {
return &RREvent{
- T: t,
- P: plugin,
- M: msg,
+ typ: t,
+ plugin: plugin,
+ message: msg,
}
}
func (r *RREvent) Type() EventType {
- return r.T
+ return r.typ
}
func (r *RREvent) Message() string {
- return r.M
+ return r.message
}
func (r *RREvent) Plugin() string {
- return r.P
-}
-
-type EventType uint32
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk EventType = iota
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-func (et EventType) String() string {
- switch et {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
-
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
-
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
-
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
-
- default:
- return "UnknownEventType"
- }
+ return r.plugin
}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 27db830c..11112e72 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -99,7 +99,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -195,11 +195,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, pid: %d", err.Error(), w.Pid())))
}
}
@@ -221,11 +217,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(&events.RREvent{
- T: events.EventNoFreeWorkers,
- P: pluginName,
- M: fmt.Sprintf("error: %s", err),
- })
+ sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("error: %s", err)))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -245,20 +237,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(&events.RREvent{
- T: events.EventExecTTL,
- P: pluginName,
- M: fmt.Sprintf("error: %s", err),
- })
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("error: %s", err)))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
@@ -279,11 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// kill the worker instead of sending net packet to it
_ = w.Kill()
@@ -291,11 +271,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerDestruct,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, w.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, pid: %d", err, w.Pid())))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -319,11 +295,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerConstruct,
- P: pluginName,
- M: fmt.Sprintf("pid: %d", sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("pid: %d", sw.Pid())))
return sw, nil
}
}
@@ -345,11 +317,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
sw.State().Set(worker.StateDestroyed)
err = sw.Kill()
if err != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
return nil, err
}
@@ -366,11 +334,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// redirect call to the worker with TTL
r, err := sw.ExecWithTTL(ctx, p)
if stopErr := sw.Stop(); stopErr != nil {
- sp.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: pluginName,
- M: fmt.Sprintf("error: %s, pid: %d", err, sw.Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, pid: %d", err, sw.Pid())))
}
return r, err
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index abef3779..717d301e 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -170,6 +170,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -201,6 +202,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
// Run pool events
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
require.NoError(t, err)
@@ -489,6 +491,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
require.NoError(t, err)
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index c1fb6eec..1a94f6a0 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -28,23 +28,20 @@ type Supervised interface {
}
type supervised struct {
- cfg *SupervisorConfig
- events events.EventBus
- eventsID string
- pool Pool
- stopCh chan struct{}
- mu *sync.RWMutex
+ cfg *SupervisorConfig
+ events events.EventBus
+ pool Pool
+ stopCh chan struct{}
+ mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, cfg *SupervisorConfig) Supervised {
- eb, id := events.Bus()
+func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised {
sp := &supervised{
- cfg: cfg,
- events: eb,
- eventsID: id,
- pool: pool,
- mu: &sync.RWMutex{},
- stopCh: make(chan struct{}),
+ cfg: cfg,
+ events: eb,
+ pool: pool,
+ mu: &sync.RWMutex{},
+ stopCh: make(chan struct{}),
}
return sp
@@ -155,11 +152,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventTTL,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -179,11 +172,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventMaxMemory,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
continue
}
@@ -238,11 +227,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(&events.RREvent{
- T: events.EventIdleTTL,
- P: supervisorName,
- M: fmt.Sprintf("worker's pid: %d", workers[i].Pid()),
- })
+ sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("worker's pid: %d", workers[i].Pid())))
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 9c0bfdaa..eb3c37dd 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -332,6 +332,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
}
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
require.NoError(t, err)
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 81004027..256176de 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -108,6 +108,7 @@ func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -370,6 +371,7 @@ func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index 8c6d440a..0f527cd5 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -127,6 +127,7 @@ func Test_Pipe_Failboot(t *testing.T) {
ctx := context.Background()
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -434,6 +435,7 @@ func Test_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 45fb3bd5..2db2fd40 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -112,6 +112,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -162,6 +163,7 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -271,6 +273,7 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -327,6 +330,7 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 11b34999..7b28a847 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -126,6 +126,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -203,6 +204,7 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -366,6 +368,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
@@ -440,6 +443,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
ch := make(chan events.Event, 10)
err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
require.NoError(t, err)
diff --git a/worker/worker.go b/worker/worker.go
index 5973adc6..05c6dd0d 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -135,6 +135,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
var err error
err = w.cmd.Wait()
+ defer w.events.Unsubscribe(w.eventsID)
// If worker was destroyed, just exit
if w.State().Value() == StateDestroyed {
@@ -162,8 +163,6 @@ func (w *Process) Wait() error {
return nil
}
- w.events.Unsubscribe(w.eventsID)
-
return err
}
@@ -221,11 +220,6 @@ func (w *Process) Kill() error {
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Send(&events.RREvent{
- T: events.EventWorkerStderr,
- P: workerEventsName,
- M: utils.AsString(p),
- })
-
+ w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p)))
return len(p), nil
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index 871e6146..d425994e 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -148,11 +148,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: wwName,
- M: fmt.Sprintf("can't allocate the worker: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
@@ -176,11 +172,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerError,
- P: wwName,
- M: fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
continue
}
@@ -287,11 +279,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerWaitExit,
- P: wwName,
- M: fmt.Sprintf("error: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("error: %v", err)))
}
// remove worker
@@ -299,11 +287,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerDestruct,
- P: wwName,
- M: fmt.Sprintf("pid: %d", w.Pid()),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("pid: %d", w.Pid())))
return
}
@@ -313,11 +297,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Send(&events.RREvent{
- T: events.EventWorkerProcessExit,
- P: wwName,
- M: fmt.Sprintf("error: %v", err),
- })
+ ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("error: %v", err)))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {