diff options
Diffstat (limited to 'events')
-rwxr-xr-x | events/general.go | 41 | ||||
-rw-r--r-- | events/grpc_event.go | 39 | ||||
-rw-r--r-- | events/interface.go | 14 | ||||
-rw-r--r-- | events/jobs_events.go | 81 | ||||
-rw-r--r-- | events/pool_events.go | 71 | ||||
-rw-r--r-- | events/worker_events.go | 36 |
6 files changed, 282 insertions, 0 deletions
diff --git a/events/general.go b/events/general.go new file mode 100755 index 00000000..5cf13e10 --- /dev/null +++ b/events/general.go @@ -0,0 +1,41 @@ +package events + +import ( + "sync" +) + +const UnknownEventType string = "Unknown event type" + +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []Listener + sync.RWMutex // all receivers should be pointers +} + +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} +} + +// NumListeners returns number of event listeners. +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *HandlerImpl) AddListener(listener Listener) { + eb.Lock() + defer eb.Unlock() + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() + for k := range eb.listeners { + eb.listeners[k](e) + } +} diff --git a/events/grpc_event.go b/events/grpc_event.go new file mode 100644 index 00000000..31ff4957 --- /dev/null +++ b/events/grpc_event.go @@ -0,0 +1,39 @@ +package events + +import ( + "time" + + "google.golang.org/grpc" +) + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk G = iota + 13000 + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr +) + +type G int64 + +func (ev G) String() string { + switch ev { + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + } + return UnknownEventType +} + +// JobEvent represent job event. +type GRPCEvent struct { + Event G + // Info contains unary call info. + Info *grpc.UnaryServerInfo + // Error associated with event. + Error error + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/events/interface.go b/events/interface.go new file mode 100644 index 00000000..7d57e4d0 --- /dev/null +++ b/events/interface.go @@ -0,0 +1,14 @@ +package events + +// Handler interface +type Handler interface { + // NumListeners return number of active listeners + NumListeners() int + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners + Push(e interface{}) +} + +// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. +type Listener func(event interface{}) diff --git a/events/jobs_events.go b/events/jobs_events.go new file mode 100644 index 00000000..f65ede67 --- /dev/null +++ b/events/jobs_events.go @@ -0,0 +1,81 @@ +package events + +import ( + "time" +) + +const ( + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK J = iota + 12000 + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady +) + +type J int64 + +func (ev J) String() string { + switch ev { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + } + return UnknownEventType +} + +// JobEvent represent job event. +type JobEvent struct { + Event J + // String is job id. + ID string + // Pipeline name + Pipeline string + // Associated driver name (amqp, ephemeral, etc) + Driver string + // Error for the jobs/pipes errors + Error error + // event timings + Start time.Time + Elapsed time.Duration +} diff --git a/events/pool_events.go b/events/pool_events.go new file mode 100644 index 00000000..eb28df6a --- /dev/null +++ b/events/pool_events.go @@ -0,0 +1,71 @@ +package events + +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 10000 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + } + return UnknownEventType +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} + Error error +} diff --git a/events/worker_events.go b/events/worker_events.go new file mode 100644 index 00000000..39c38e57 --- /dev/null +++ b/events/worker_events.go @@ -0,0 +1,36 @@ +package events + +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError W = iota + 11000 + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr +) + +type W int64 + +func (ev W) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + } + return UnknownEventType +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event W + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} |