diff options
Diffstat (limited to 'pkg/events')
-rwxr-xr-x | pkg/events/general.go | 39 | ||||
-rw-r--r-- | pkg/events/interface.go | 14 | ||||
-rw-r--r-- | pkg/events/pool_events.go | 66 | ||||
-rw-r--r-- | pkg/events/worker_events.go | 33 |
4 files changed, 152 insertions, 0 deletions
diff --git a/pkg/events/general.go b/pkg/events/general.go new file mode 100755 index 00000000..a09a8759 --- /dev/null +++ b/pkg/events/general.go @@ -0,0 +1,39 @@ +package events + +import ( + "sync" +) + +// HandlerImpl helps to broadcast events to multiple listeners. +type HandlerImpl struct { + listeners []Listener + sync.RWMutex // all receivers should be pointers +} + +func NewEventsHandler() Handler { + return &HandlerImpl{listeners: make([]Listener, 0, 2)} +} + +// NumListeners returns number of event listeners. +func (eb *HandlerImpl) NumListeners() int { + eb.Lock() + defer eb.Unlock() + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *HandlerImpl) AddListener(listener Listener) { + eb.Lock() + defer eb.Unlock() + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *HandlerImpl) Push(e interface{}) { + // ReadLock here because we are not changing listeners + eb.RLock() + defer eb.RUnlock() + for k := range eb.listeners { + eb.listeners[k](e) + } +} diff --git a/pkg/events/interface.go b/pkg/events/interface.go new file mode 100644 index 00000000..ac6c15a4 --- /dev/null +++ b/pkg/events/interface.go @@ -0,0 +1,14 @@ +package events + +// Handler interface +type Handler interface { + // Return number of active listeners + NumListeners() int + // AddListener adds lister to the publisher + AddListener(listener Listener) + // Push pushes event to the listeners + Push(e interface{}) +} + +// Event listener listens for the events produced by worker, worker pool or other service. +type Listener func(event interface{}) diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go new file mode 100644 index 00000000..3925df56 --- /dev/null +++ b/pkg/events/pool_events.go @@ -0,0 +1,66 @@ +package events + +// TODO event numbers +const ( + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct P = iota + 10000 + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventPoolError caused on pool wide errors. + EventPoolError + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL +) + +type P int64 + +func (ev P) String() string { + switch ev { + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventPoolError: + return "EventPoolError" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + } + return "Unknown event type" +} + +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event P + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go new file mode 100644 index 00000000..9d428f7d --- /dev/null +++ b/pkg/events/worker_events.go @@ -0,0 +1,33 @@ +package events + +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError W = iota + 11000 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog +) + +type W int64 + +func (ev W) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + +// WorkerEvent wraps worker events. +type WorkerEvent struct { + // Event id, see below. + Event W + + // Worker triggered the event. + Worker interface{} + + // Event specific payload. + Payload interface{} +} |