summaryrefslogtreecommitdiff
path: root/pkg/events
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/events')
-rwxr-xr-xpkg/events/general.go39
-rw-r--r--pkg/events/interface.go14
-rw-r--r--pkg/events/pool_events.go66
-rw-r--r--pkg/events/worker_events.go33
4 files changed, 152 insertions, 0 deletions
diff --git a/pkg/events/general.go b/pkg/events/general.go
new file mode 100755
index 00000000..a09a8759
--- /dev/null
+++ b/pkg/events/general.go
@@ -0,0 +1,39 @@
+package events
+
+import (
+ "sync"
+)
+
+// HandlerImpl helps to broadcast events to multiple listeners.
+type HandlerImpl struct {
+ listeners []Listener
+ sync.RWMutex // all receivers should be pointers
+}
+
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
+}
+
+// NumListeners returns number of event listeners.
+func (eb *HandlerImpl) NumListeners() int {
+ eb.Lock()
+ defer eb.Unlock()
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *HandlerImpl) AddListener(listener Listener) {
+ eb.Lock()
+ defer eb.Unlock()
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *HandlerImpl) Push(e interface{}) {
+ // ReadLock here because we are not changing listeners
+ eb.RLock()
+ defer eb.RUnlock()
+ for k := range eb.listeners {
+ eb.listeners[k](e)
+ }
+}
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
new file mode 100644
index 00000000..ac6c15a4
--- /dev/null
+++ b/pkg/events/interface.go
@@ -0,0 +1,14 @@
+package events
+
+// Handler interface
+type Handler interface {
+ // Return number of active listeners
+ NumListeners() int
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type Listener func(event interface{})
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
new file mode 100644
index 00000000..3925df56
--- /dev/null
+++ b/pkg/events/pool_events.go
@@ -0,0 +1,66 @@
+package events
+
+// TODO event numbers
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 10000
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
new file mode 100644
index 00000000..9d428f7d
--- /dev/null
+++ b/pkg/events/worker_events.go
@@ -0,0 +1,33 @@
+package events
+
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError W = iota + 11000
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type W int64
+
+func (ev W) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event W
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}