diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-27 22:42:07 +0300 |
commit | 52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch) | |
tree | 296d48dab1d5396313c061fbfc930a97af0df9f4 /events | |
parent | 15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff) |
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'events')
-rw-r--r-- | events/events.go | 143 | ||||
-rw-r--r-- | events/events_test.go | 6 | ||||
-rw-r--r-- | events/types.go | 165 |
3 files changed, 157 insertions, 157 deletions
diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..b7396653 --- /dev/null +++ b/events/events.go @@ -0,0 +1,143 @@ +package events + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/events_test.go b/events/events_test.go index 6d6e90a2..e15c55d6 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -15,7 +15,7 @@ func TestEvenHandler(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) @@ -37,7 +37,7 @@ func TestEvenHandler2(t *testing.T) { err = eh.SubscribeP(id, "http.EventJobOK", ch2) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch2 require.Equal(t, "foo", evt.Message()) @@ -85,7 +85,7 @@ func TestEvenHandler5(t *testing.T) { err := eh.SubscribeP(id, "http.EventJobOK", ch) require.NoError(t, err) - eh.Send(NewRREvent(EventJobOK, "foo", "http")) + eh.Send(NewEvent(EventJobOK, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) diff --git a/events/types.go b/events/types.go index d8e40084..65a76d15 100644 --- a/events/types.go +++ b/events/types.go @@ -17,173 +17,30 @@ type Event interface { type RREvent struct { // event typ - T EventType + typ EventType // plugin - P string + plugin string // message - M string + message string } -// NewRREvent initializes new event -func NewRREvent(t EventType, msg string, plugin string) *RREvent { - // get +// NewEvent initializes new event +func NewEvent(t EventType, plugin string, msg string) *RREvent { return &RREvent{ - T: t, - P: plugin, - M: msg, + typ: t, + plugin: plugin, + message: msg, } } func (r *RREvent) Type() EventType { - return r.T + return r.typ } func (r *RREvent) Message() string { - return r.M + return r.message } func (r *RREvent) Plugin() string { - return r.P -} - -type EventType uint32 - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk EventType = iota - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr - - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady - - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart - - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -func (et EventType) String() string { - switch et { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - - default: - return "UnknownEventType" - } + return r.plugin } |