summaryrefslogtreecommitdiff
path: root/events
diff options
context:
space:
mode:
Diffstat (limited to 'events')
-rwxr-xr-xevents/general.go41
-rw-r--r--events/grpc_event.go39
-rw-r--r--events/interface.go14
-rw-r--r--events/jobs_events.go81
-rw-r--r--events/pool_events.go71
-rw-r--r--events/worker_events.go36
6 files changed, 282 insertions, 0 deletions
diff --git a/events/general.go b/events/general.go
new file mode 100755
index 00000000..5cf13e10
--- /dev/null
+++ b/events/general.go
@@ -0,0 +1,41 @@
+package events
+
+import (
+ "sync"
+)
+
+const UnknownEventType string = "Unknown event type"
+
+// HandlerImpl helps to broadcast events to multiple listeners.
+type HandlerImpl struct {
+ listeners []Listener
+ sync.RWMutex // all receivers should be pointers
+}
+
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
+}
+
+// NumListeners returns number of event listeners.
+func (eb *HandlerImpl) NumListeners() int {
+ eb.Lock()
+ defer eb.Unlock()
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *HandlerImpl) AddListener(listener Listener) {
+ eb.Lock()
+ defer eb.Unlock()
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *HandlerImpl) Push(e interface{}) {
+ // ReadLock here because we are not changing listeners
+ eb.RLock()
+ defer eb.RUnlock()
+ for k := range eb.listeners {
+ eb.listeners[k](e)
+ }
+}
diff --git a/events/grpc_event.go b/events/grpc_event.go
new file mode 100644
index 00000000..31ff4957
--- /dev/null
+++ b/events/grpc_event.go
@@ -0,0 +1,39 @@
+package events
+
+import (
+ "time"
+
+ "google.golang.org/grpc"
+)
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk G = iota + 13000
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+)
+
+type G int64
+
+func (ev G) String() string {
+ switch ev {
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type GRPCEvent struct {
+ Event G
+ // Info contains unary call info.
+ Info *grpc.UnaryServerInfo
+ // Error associated with event.
+ Error error
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/events/interface.go b/events/interface.go
new file mode 100644
index 00000000..7d57e4d0
--- /dev/null
+++ b/events/interface.go
@@ -0,0 +1,14 @@
+package events
+
+// Handler interface
+type Handler interface {
+ // NumListeners return number of active listeners
+ NumListeners() int
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
+ Push(e interface{})
+}
+
+// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
+type Listener func(event interface{})
diff --git a/events/jobs_events.go b/events/jobs_events.go
new file mode 100644
index 00000000..f65ede67
--- /dev/null
+++ b/events/jobs_events.go
@@ -0,0 +1,81 @@
+package events
+
+import (
+ "time"
+)
+
+const (
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK J = iota + 12000
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+)
+
+type J int64
+
+func (ev J) String() string {
+ switch ev {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+ }
+ return UnknownEventType
+}
+
+// JobEvent represent job event.
+type JobEvent struct {
+ Event J
+ // String is job id.
+ ID string
+ // Pipeline name
+ Pipeline string
+ // Associated driver name (amqp, ephemeral, etc)
+ Driver string
+ // Error for the jobs/pipes errors
+ Error error
+ // event timings
+ Start time.Time
+ Elapsed time.Duration
+}
diff --git a/events/pool_events.go b/events/pool_events.go
new file mode 100644
index 00000000..eb28df6a
--- /dev/null
+++ b/events/pool_events.go
@@ -0,0 +1,71 @@
+package events
+
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 10000
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+ }
+ return UnknownEventType
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+ Error error
+}
diff --git a/events/worker_events.go b/events/worker_events.go
new file mode 100644
index 00000000..39c38e57
--- /dev/null
+++ b/events/worker_events.go
@@ -0,0 +1,36 @@
+package events
+
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError W = iota + 11000
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+)
+
+type W int64
+
+func (ev W) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ }
+ return UnknownEventType
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event W
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}