summaryrefslogtreecommitdiff
path: root/events
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /events
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'events')
-rw-r--r--events/docs/events.md0
-rw-r--r--events/events_test.go22
-rw-r--r--events/eventsbus.go129
-rwxr-xr-xevents/general.go41
-rw-r--r--events/grpc_event.go39
-rw-r--r--events/init.go20
-rw-r--r--events/interface.go14
-rw-r--r--events/jobs_events.go81
-rw-r--r--events/pool_events.go71
-rw-r--r--events/types.go196
-rw-r--r--events/wildcard.go43
-rw-r--r--events/wildcard_test.go48
-rw-r--r--events/worker_events.go40
13 files changed, 458 insertions, 286 deletions
diff --git a/events/docs/events.md b/events/docs/events.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/events/docs/events.md
diff --git a/events/events_test.go b/events/events_test.go
new file mode 100644
index 00000000..6c501392
--- /dev/null
+++ b/events/events_test.go
@@ -0,0 +1,22 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestEvenHandler(t *testing.T) {
+ eh, id := Bus()
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
diff --git a/events/eventsbus.go b/events/eventsbus.go
new file mode 100644
index 00000000..79f5babd
--- /dev/null
+++ b/events/eventsbus.go
@@ -0,0 +1,129 @@
+package events
+
+import (
+ "fmt"
+ "sync"
+)
+
+type sub struct {
+ pattern string
+ w *wildcard
+ events chan<- Event
+}
+
+type eventsBus struct {
+ sync.RWMutex
+ subscribers sync.Map
+ internalEvCh chan Event
+ stop chan struct{}
+}
+
+func newEventsBus() *eventsBus {
+ return &eventsBus{
+ internalEvCh: make(chan Event, 100),
+ stop: make(chan struct{}),
+ }
+}
+
+/*
+http.* <-
+*/
+
+// SubscribeAll for all RR events
+// returns subscriptionID
+func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ return eb.subscribe(subID, "*", ch)
+}
+
+// SubscribeP pattern like "pluginName.EventType"
+func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ return eb.subscribe(subID, pattern, ch)
+}
+
+func (eb *eventsBus) Unsubscribe(subID string) {
+ eb.subscribers.Delete(subID)
+}
+func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ eb.Lock()
+ defer eb.Unlock()
+
+ sbArr := sb.([]*sub)
+
+ for i := 0; i < len(sbArr); i++ {
+ if sbArr[i].pattern == pattern {
+ sbArr[i] = sbArr[len(sbArr)-1]
+ sbArr = sbArr[:len(sbArr)-1]
+ // replace with new array
+ eb.subscribers.Store(subID, sbArr)
+ return
+ }
+ }
+ }
+}
+
+// Send sends event to the events bus
+func (eb *eventsBus) Send(ev Event) {
+ eb.internalEvCh <- ev
+}
+
+func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
+ eb.Lock()
+ defer eb.Unlock()
+ w, err := newWildcard(pattern)
+ if err != nil {
+ return err
+ }
+
+ if sb, ok := eb.subscribers.Load(subID); ok {
+ // at this point we are confident that sb is a []*sub type
+ subArr := sb.([]*sub)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+ }
+
+ subArr := make([]*sub, 0, 5)
+ subArr = append(subArr, &sub{
+ pattern: pattern,
+ w: w,
+ events: ch,
+ })
+
+ eb.subscribers.Store(subID, subArr)
+
+ return nil
+}
+
+func (eb *eventsBus) handleEvents() {
+ for { //nolint:gosimple
+ select {
+ case ev := <-eb.internalEvCh:
+ //
+ wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ vsub := value.([]*sub)
+
+ for i := 0; i < len(vsub); i++ {
+ if vsub[i].w.match(wc) {
+ select {
+ case vsub[i].events <- ev:
+ return true
+ default:
+ return true
+ }
+ }
+ }
+
+ return true
+ })
+ }
+ }
+}
diff --git a/events/general.go b/events/general.go
deleted file mode 100755
index 5cf13e10..00000000
--- a/events/general.go
+++ /dev/null
@@ -1,41 +0,0 @@
-package events
-
-import (
- "sync"
-)
-
-const UnknownEventType string = "Unknown event type"
-
-// HandlerImpl helps to broadcast events to multiple listeners.
-type HandlerImpl struct {
- listeners []Listener
- sync.RWMutex // all receivers should be pointers
-}
-
-func NewEventsHandler() Handler {
- return &HandlerImpl{listeners: make([]Listener, 0, 2)}
-}
-
-// NumListeners returns number of event listeners.
-func (eb *HandlerImpl) NumListeners() int {
- eb.Lock()
- defer eb.Unlock()
- return len(eb.listeners)
-}
-
-// AddListener registers new event listener.
-func (eb *HandlerImpl) AddListener(listener Listener) {
- eb.Lock()
- defer eb.Unlock()
- eb.listeners = append(eb.listeners, listener)
-}
-
-// Push broadcast events across all event listeners.
-func (eb *HandlerImpl) Push(e interface{}) {
- // ReadLock here because we are not changing listeners
- eb.RLock()
- defer eb.RUnlock()
- for k := range eb.listeners {
- eb.listeners[k](e)
- }
-}
diff --git a/events/grpc_event.go b/events/grpc_event.go
deleted file mode 100644
index 31ff4957..00000000
--- a/events/grpc_event.go
+++ /dev/null
@@ -1,39 +0,0 @@
-package events
-
-import (
- "time"
-
- "google.golang.org/grpc"
-)
-
-const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk G = iota + 13000
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-)
-
-type G int64
-
-func (ev G) String() string {
- switch ev {
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type GRPCEvent struct {
- Event G
- // Info contains unary call info.
- Info *grpc.UnaryServerInfo
- // Error associated with event.
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/init.go b/events/init.go
new file mode 100644
index 00000000..25e92fc5
--- /dev/null
+++ b/events/init.go
@@ -0,0 +1,20 @@
+package events
+
+import (
+ "sync"
+
+ "github.com/google/uuid"
+)
+
+var evBus *eventsBus
+var onInit = &sync.Once{}
+
+func Bus() (*eventsBus, string) {
+ onInit.Do(func() {
+ evBus = newEventsBus()
+ go evBus.handleEvents()
+ })
+
+ // return events bus with subscriberID
+ return evBus, uuid.NewString()
+}
diff --git a/events/interface.go b/events/interface.go
deleted file mode 100644
index 7d57e4d0..00000000
--- a/events/interface.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package events
-
-// Handler interface
-type Handler interface {
- // NumListeners return number of active listeners
- NumListeners() int
- // AddListener adds lister to the publisher
- AddListener(listener Listener)
- // Push pushes event to the listeners
- Push(e interface{})
-}
-
-// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service.
-type Listener func(event interface{})
diff --git a/events/jobs_events.go b/events/jobs_events.go
deleted file mode 100644
index f65ede67..00000000
--- a/events/jobs_events.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package events
-
-import (
- "time"
-)
-
-const (
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK J = iota + 12000
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-)
-
-type J int64
-
-func (ev J) String() string {
- switch ev {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
- }
- return UnknownEventType
-}
-
-// JobEvent represent job event.
-type JobEvent struct {
- Event J
- // String is job id.
- ID string
- // Pipeline name
- Pipeline string
- // Associated driver name (amqp, ephemeral, etc)
- Driver string
- // Error for the jobs/pipes errors
- Error error
- // event timings
- Start time.Time
- Elapsed time.Duration
-}
diff --git a/events/pool_events.go b/events/pool_events.go
deleted file mode 100644
index eb28df6a..00000000
--- a/events/pool_events.go
+++ /dev/null
@@ -1,71 +0,0 @@
-package events
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct P = iota + 10000
-
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
-
- // EventSupervisorError triggered when supervisor can not complete work.
- EventSupervisorError
-
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
-
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
-
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
-
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
-
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
-
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
-
- // EventPoolRestart triggered when pool restart is needed
- EventPoolRestart
-)
-
-type P int64
-
-func (ev P) String() string {
- switch ev {
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventSupervisorError:
- return "EventSupervisorError"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventPoolRestart:
- return "EventPoolRestart"
- }
- return UnknownEventType
-}
-
-// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
-type PoolEvent struct {
- // Event type, see below.
- Event P
-
- // Payload depends on event type, typically it's worker or error.
- Payload interface{}
- Error error
-}
diff --git a/events/types.go b/events/types.go
new file mode 100644
index 00000000..09bbf7a7
--- /dev/null
+++ b/events/types.go
@@ -0,0 +1,196 @@
+package events
+
+import (
+ "fmt"
+)
+
+type EventBus interface {
+ SubscribeAll(subID string, ch chan<- Event) error
+ SubscribeP(subID string, pattern string, ch chan<- Event) error
+ Unsubscribe(subID string)
+ UnsubscribeP(subID, pattern string)
+ Send(ev Event)
+}
+
+type Event interface {
+ fmt.Stringer
+ Plugin() string
+ Type() EventType
+ Message() string
+}
+
+type RREvent struct {
+ // event typ
+ T EventType
+ // plugin
+ P string
+ // message
+ M string
+}
+
+// NewRREvent initializes new event
+func NewRREvent(t EventType, msg string, plugin string) *RREvent {
+ return &RREvent{
+ T: t,
+ P: plugin,
+ M: msg,
+ }
+}
+
+func (r *RREvent) String() string {
+ return "RoadRunner event"
+}
+
+func (r *RREvent) Type() EventType {
+ return r.T
+}
+
+func (r *RREvent) Message() string {
+ return r.M
+}
+
+func (r *RREvent) Plugin() string {
+ return r.P
+}
+
+type EventType uint32
+
+const (
+ // EventUnaryCallOk represents success unary call response
+ EventUnaryCallOk EventType = iota
+
+ // EventUnaryCallErr raised when unary call ended with error
+ EventUnaryCallErr
+
+ // EventPushOK thrown when new job has been added. JobEvent is passed as context.
+ EventPushOK
+
+ // EventPushError caused when job can not be registered.
+ EventPushError
+
+ // EventJobStart thrown when new job received.
+ EventJobStart
+
+ // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
+ EventJobOK
+
+ // EventJobError thrown on all job related errors. See JobError as context.
+ EventJobError
+
+ // EventPipeActive when pipeline has started.
+ EventPipeActive
+
+ // EventPipeStopped when pipeline has been stopped.
+ EventPipeStopped
+
+ // EventPipePaused when pipeline has been paused.
+ EventPipePaused
+
+ // EventPipeError when pipeline specific error happen.
+ EventPipeError
+
+ // EventDriverReady thrown when broken is ready to accept/serve tasks.
+ EventDriverReady
+
+ // EventWorkerConstruct thrown when new worker is spawned.
+ EventWorkerConstruct
+
+ // EventWorkerDestruct thrown after worker destruction.
+ EventWorkerDestruct
+
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
+
+ // EventWorkerProcessExit triggered on process wait exit
+ EventWorkerProcessExit
+
+ // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
+ EventNoFreeWorkers
+
+ // EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
+ EventTTL
+
+ // EventIdleTTL triggered when worker spends too much time at rest.
+ EventIdleTTL
+
+ // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
+
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
+ // EventWorkerWaitExit is the worker exit event
+ EventWorkerWaitExit
+)
+
+func (et EventType) String() string {
+ switch et {
+ case EventPushOK:
+ return "EventPushOK"
+ case EventPushError:
+ return "EventPushError"
+ case EventJobStart:
+ return "EventJobStart"
+ case EventJobOK:
+ return "EventJobOK"
+ case EventJobError:
+ return "EventJobError"
+ case EventPipeActive:
+ return "EventPipeActive"
+ case EventPipeStopped:
+ return "EventPipeStopped"
+ case EventPipeError:
+ return "EventPipeError"
+ case EventDriverReady:
+ return "EventDriverReady"
+ case EventPipePaused:
+ return "EventPipePaused"
+
+ case EventUnaryCallOk:
+ return "EventUnaryCallOk"
+ case EventUnaryCallErr:
+ return "EventUnaryCallErr"
+
+ case EventWorkerProcessExit:
+ return "EventWorkerProcessExit"
+ case EventWorkerConstruct:
+ return "EventWorkerConstruct"
+ case EventWorkerDestruct:
+ return "EventWorkerDestruct"
+ case EventSupervisorError:
+ return "EventSupervisorError"
+ case EventNoFreeWorkers:
+ return "EventNoFreeWorkers"
+ case EventMaxMemory:
+ return "EventMaxMemory"
+ case EventTTL:
+ return "EventTTL"
+ case EventIdleTTL:
+ return "EventIdleTTL"
+ case EventExecTTL:
+ return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
+
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
+ case EventWorkerWaitExit:
+ return "EventWorkerWaitExit"
+
+ default:
+ return "UnknownEventType"
+ }
+}
diff --git a/events/wildcard.go b/events/wildcard.go
new file mode 100644
index 00000000..171cbf9d
--- /dev/null
+++ b/events/wildcard.go
@@ -0,0 +1,43 @@
+package events
+
+import (
+ "strings"
+
+ "github.com/spiral/errors"
+)
+
+type wildcard struct {
+ prefix string
+ suffix string
+}
+
+func newWildcard(pattern string) (*wildcard, error) {
+ // Normalize
+ origin := strings.ToLower(pattern)
+ i := strings.IndexByte(origin, '*')
+
+ /*
+ http.*
+ *
+ *.WorkerError
+ */
+ if i == -1 {
+ dotI := strings.IndexByte(pattern, '.')
+
+ if dotI == -1 {
+ // http.SuperEvent
+ return nil, errors.Str("wrong wildcard, no * or .")
+ }
+
+ return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil
+ }
+
+ // pref: http.
+ // suff: *
+ return &wildcard{origin[0:i], origin[i+1:]}, nil
+}
+
+func (w wildcard) match(s string) bool {
+ s = strings.ToLower(s)
+ return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
+}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
new file mode 100644
index 00000000..230ef673
--- /dev/null
+++ b/events/wildcard_test.go
@@ -0,0 +1,48 @@
+package events
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestWildcard(t *testing.T) {
+ w, err := newWildcard("http.*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ // *.* -> *
+ w, err = newWildcard("*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.SuperEvent"))
+ assert.True(t, w.match("https.SuperEvent"))
+ assert.True(t, w.match(""))
+ assert.True(t, w.match("*"))
+ assert.True(t, w.match("****"))
+ assert.True(t, w.match("http.****"))
+
+ w, err = newWildcard("*.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.WorkerError")
+ assert.NoError(t, err)
+ assert.False(t, w.match("http.SuperEvent"))
+ assert.False(t, w.match("https.SuperEvent"))
+ assert.False(t, w.match(""))
+ assert.False(t, w.match("*"))
+ assert.False(t, w.match("****"))
+ assert.False(t, w.match("http.****"))
+ assert.True(t, w.match("http.WorkerError"))
+}
diff --git a/events/worker_events.go b/events/worker_events.go
deleted file mode 100644
index 6b80df61..00000000
--- a/events/worker_events.go
+++ /dev/null
@@ -1,40 +0,0 @@
-package events
-
-const (
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError W = iota + 11000
- // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
- EventWorkerLog
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
-)
-
-type W int64
-
-func (ev W) String() string {
- switch ev {
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerLog:
- return "EventWorkerLog"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
- }
- return UnknownEventType
-}
-
-// WorkerEvent wraps worker events.
-type WorkerEvent struct {
- // Event id, see below.
- Event W
-
- // Worker triggered the event.
- Worker interface{}
-
- // Event specific payload.
- Payload interface{}
-}