summaryrefslogtreecommitdiff
path: root/events
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
committerValery Piashchynski <[email protected]>2021-10-27 22:42:07 +0300
commit52a6b1b2fc3eaf3cda5594825f3c5a9ae8a9452b (patch)
tree296d48dab1d5396313c061fbfc930a97af0df9f4 /events
parent15e6f474ef1340f4e93f213bab1cb9548e51a1e8 (diff)
Make sure events bus properly closed
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'events')
-rw-r--r--events/events.go143
-rw-r--r--events/events_test.go6
-rw-r--r--events/types.go165
3 files changed, 157 insertions, 157 deletions
diff --git a/events/events.go b/events/events.go
new file mode 100644
index 00000000..b7396653
--- /dev/null
+++ b/events/events.go
@@ -0,0 +1,143 @@
+package events
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/events_test.go b/events/events_test.go
index 6d6e90a2..e15c55d6 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -15,7 +15,7 @@ func TestEvenHandler(t *testing.T) {
err := eh.SubscribeP(id, "http.EventJobOK", ch)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
@@ -37,7 +37,7 @@ func TestEvenHandler2(t *testing.T) {
err = eh.SubscribeP(id, "http.EventJobOK", ch2)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch2
require.Equal(t, "foo", evt.Message())
@@ -85,7 +85,7 @@ func TestEvenHandler5(t *testing.T) {
err := eh.SubscribeP(id, "http.EventJobOK", ch)
require.NoError(t, err)
- eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+ eh.Send(NewEvent(EventJobOK, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
diff --git a/events/types.go b/events/types.go
index d8e40084..65a76d15 100644
--- a/events/types.go
+++ b/events/types.go
@@ -17,173 +17,30 @@ type Event interface {
type RREvent struct {
// event typ
- T EventType
+ typ EventType
// plugin
- P string
+ plugin string
// message
- M string
+ message string
}
-// NewRREvent initializes new event
-func NewRREvent(t EventType, msg string, plugin string) *RREvent {
- // get
+// NewEvent initializes new event
+func NewEvent(t EventType, plugin string, msg string) *RREvent {
return &RREvent{
- T: t,
- P: plugin,
- M: msg,
+ typ: t,
+ plugin: plugin,
+ message: msg,
}
}
func (r *RREvent) Type() EventType {
- return r.T
+ return r.typ
}
func (r *RREvent) Message() string {
- return r.M
+ return r.message
}
func (r *RREvent) Plugin() string {
- return r.P
-}
-
-type EventType uint32
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk EventType = iota
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-func (et EventType) String() string {
- switch et {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
-
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
-
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
-
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
-
- default:
- return "UnknownEventType"
- }
+ return r.plugin
}