summaryrefslogtreecommitdiff
path: root/pkg/events
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
committerValery Piashchynski <[email protected]>2021-01-23 23:38:10 +0300
commit7fb3cc3588cfde9260a6bb431330ce1e0a71f56d (patch)
tree3200cf2136f7413a7e1cfc6ecdaa83716f9655f9 /pkg/events
parentee5d34abde7f3931bf939498eb7a8cb170232f4f (diff)
interfaces folder deprecated
Diffstat (limited to 'pkg/events')
-rwxr-xr-xpkg/events/general.go (renamed from pkg/events/events.go)10
-rw-r--r--pkg/events/interface.go14
-rw-r--r--pkg/events/pool_events.go66
-rw-r--r--pkg/events/worker_events.go33
4 files changed, 117 insertions, 6 deletions
diff --git a/pkg/events/events.go b/pkg/events/general.go
index 226a0c91..a09a8759 100755
--- a/pkg/events/events.go
+++ b/pkg/events/general.go
@@ -2,18 +2,16 @@ package events
import (
"sync"
-
- "github.com/spiral/roadrunner/v2/interfaces/events"
)
// HandlerImpl helps to broadcast events to multiple listeners.
type HandlerImpl struct {
- listeners []events.Listener
+ listeners []Listener
sync.RWMutex // all receivers should be pointers
}
-func NewEventsHandler() events.Handler {
- return &HandlerImpl{listeners: make([]events.Listener, 0, 2)}
+func NewEventsHandler() Handler {
+ return &HandlerImpl{listeners: make([]Listener, 0, 2)}
}
// NumListeners returns number of event listeners.
@@ -24,7 +22,7 @@ func (eb *HandlerImpl) NumListeners() int {
}
// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener events.Listener) {
+func (eb *HandlerImpl) AddListener(listener Listener) {
eb.Lock()
defer eb.Unlock()
eb.listeners = append(eb.listeners, listener)
diff --git a/pkg/events/interface.go b/pkg/events/interface.go
new file mode 100644
index 00000000..ac6c15a4
--- /dev/null
+++ b/pkg/events/interface.go
@@ -0,0 +1,14 @@
+package events
+
+// Handler interface
+type Handler interface {
+ // Return number of active listeners
+ NumListeners() int
+ // AddListener adds lister to the publisher
+ AddListener(listener Listener)
+ // Push pushes event to the listeners
+ Push(e interface{})
+}
+
+// Event listener listens for the events produced by worker, worker pool or other service.
+type Listener func(event interface{})
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
new file mode 100644
index 00000000..2cc76eee
--- /dev/null
+++ b/pkg/events/pool_events.go
@@ -0,0 +1,66 @@
+package events
+
+// TODO event numbers
+const (
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct P = iota + 7800
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventPoolError caused on pool wide errors.
+ EventPoolError
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+)
+
+type P int64
+
+func (ev P) String() string {
+ switch ev {
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventPoolError:
+ return "EventPoolError"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ }
+ return "Unknown event type"
+}
+
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event P
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
new file mode 100644
index 00000000..2bff1811
--- /dev/null
+++ b/pkg/events/worker_events.go
@@ -0,0 +1,33 @@
+package events
+
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError W = iota + 200
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+)
+
+type W int64
+
+func (ev W) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
+// WorkerEvent wraps worker events.
+type WorkerEvent struct {
+ // Event id, see below.
+ Event W
+
+ // Worker triggered the event.
+ Worker interface{}
+
+ // Event specific payload.
+ Payload interface{}
+}