diff options
Diffstat (limited to 'events')
-rw-r--r-- | events/docs/events.md | 0 | ||||
-rw-r--r-- | events/events.go | 143 | ||||
-rw-r--r-- | events/events_test.go | 94 | ||||
-rw-r--r-- | events/eventsbus.go | 170 | ||||
-rwxr-xr-x | events/general.go | 41 | ||||
-rw-r--r-- | events/grpc_event.go | 39 | ||||
-rw-r--r-- | events/init.go | 20 | ||||
-rw-r--r-- | events/interface.go | 14 | ||||
-rw-r--r-- | events/jobs_events.go | 81 | ||||
-rw-r--r-- | events/pool_events.go | 71 | ||||
-rw-r--r-- | events/types.go | 46 | ||||
-rw-r--r-- | events/wildcard.go | 43 | ||||
-rw-r--r-- | events/wildcard_test.go | 48 | ||||
-rw-r--r-- | events/worker_events.go | 40 |
14 files changed, 564 insertions, 286 deletions
diff --git a/events/docs/events.md b/events/docs/events.md new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/events/docs/events.md diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..b7396653 --- /dev/null +++ b/events/events.go @@ -0,0 +1,143 @@ +package events + +type EventType uint32 + +const ( + // EventUnaryCallOk represents success unary call response + EventUnaryCallOk EventType = iota + + // EventUnaryCallErr raised when unary call ended with error + EventUnaryCallErr + + // EventPushOK thrown when new job has been added. JobEvent is passed as context. + EventPushOK + + // EventPushError caused when job can not be registered. + EventPushError + + // EventJobStart thrown when new job received. + EventJobStart + + // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. + EventJobOK + + // EventJobError thrown on all job related errors. See JobError as context. + EventJobError + + // EventPipeActive when pipeline has started. + EventPipeActive + + // EventPipeStopped when pipeline has been stopped. + EventPipeStopped + + // EventPipePaused when pipeline has been paused. + EventPipePaused + + // EventPipeError when pipeline specific error happen. + EventPipeError + + // EventDriverReady thrown when broken is ready to accept/serve tasks. + EventDriverReady + + // EventWorkerConstruct thrown when new worker is spawned. + EventWorkerConstruct + + // EventWorkerDestruct thrown after worker destruction. + EventWorkerDestruct + + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // EventWorkerProcessExit triggered on process wait exit + EventWorkerProcessExit + + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + + // EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) + EventTTL + + // EventIdleTTL triggered when worker spends too much time at rest. + EventIdleTTL + + // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart + + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr + // EventWorkerWaitExit is the worker exit event + EventWorkerWaitExit +) + +func (et EventType) String() string { + switch et { + case EventPushOK: + return "EventPushOK" + case EventPushError: + return "EventPushError" + case EventJobStart: + return "EventJobStart" + case EventJobOK: + return "EventJobOK" + case EventJobError: + return "EventJobError" + case EventPipeActive: + return "EventPipeActive" + case EventPipeStopped: + return "EventPipeStopped" + case EventPipeError: + return "EventPipeError" + case EventDriverReady: + return "EventDriverReady" + case EventPipePaused: + return "EventPipePaused" + + case EventUnaryCallOk: + return "EventUnaryCallOk" + case EventUnaryCallErr: + return "EventUnaryCallErr" + + case EventWorkerProcessExit: + return "EventWorkerProcessExit" + case EventWorkerConstruct: + return "EventWorkerConstruct" + case EventWorkerDestruct: + return "EventWorkerDestruct" + case EventSupervisorError: + return "EventSupervisorError" + case EventNoFreeWorkers: + return "EventNoFreeWorkers" + case EventMaxMemory: + return "EventMaxMemory" + case EventTTL: + return "EventTTL" + case EventIdleTTL: + return "EventIdleTTL" + case EventExecTTL: + return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" + + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" + case EventWorkerWaitExit: + return "EventWorkerWaitExit" + + default: + return "UnknownEventType" + } +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 00000000..e15c55d6 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,94 @@ +package events + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEvenHandler(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} + +func TestEvenHandler2(t *testing.T) { + eh, id := Bus() + eh2, id2 := Bus() + defer eh.Unsubscribe(id) + defer eh2.Unsubscribe(id2) + + ch := make(chan Event, 100) + ch2 := make(chan Event, 100) + err := eh2.SubscribeP(id2, "http.EventJobOK", ch) + require.NoError(t, err) + + err = eh.SubscribeP(id, "http.EventJobOK", ch2) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch2 + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) + + l := eh.Len() + require.Equal(t, uint(2), l) + + eh.Unsubscribe(id) + time.Sleep(time.Second) + + l = eh.Len() + require.Equal(t, uint(1), l) + + eh2.Unsubscribe(id2) + time.Sleep(time.Second) + + l = eh.Len() + require.Equal(t, uint(0), l) +} + +func TestEvenHandler3(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "EventJobOK", ch) + require.Error(t, err) +} + +func TestEvenHandler4(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + err := eh.SubscribeP(id, "EventJobOK", nil) + require.Error(t, err) +} + +func TestEvenHandler5(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventJobOK, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go new file mode 100644 index 00000000..cd0dca71 --- /dev/null +++ b/events/eventsbus.go @@ -0,0 +1,170 @@ +package events + +import ( + "fmt" + "strings" + "sync" + + "github.com/spiral/errors" +) + +type sub struct { + pattern string + w *wildcard + events chan<- Event +} + +type eventsBus struct { + sync.RWMutex + subscribers sync.Map + internalEvCh chan Event + stop chan struct{} +} + +func newEventsBus() *eventsBus { + return &eventsBus{ + internalEvCh: make(chan Event, 100), + stop: make(chan struct{}), + } +} + +/* +http.* <- +*/ + +// SubscribeAll for all RR events +// returns subscriptionID +func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + + return eb.subscribe(subID, "*", ch) +} + +// SubscribeP pattern like "pluginName.EventType" +func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + + return eb.subscribe(subID, pattern, ch) +} + +func (eb *eventsBus) Unsubscribe(subID string) { + eb.subscribers.Delete(subID) +} + +func (eb *eventsBus) UnsubscribeP(subID, pattern string) { + if sb, ok := eb.subscribers.Load(subID); ok { + eb.Lock() + defer eb.Unlock() + + sbArr := sb.([]*sub) + + for i := 0; i < len(sbArr); i++ { + if sbArr[i].pattern == pattern { + sbArr[i] = sbArr[len(sbArr)-1] + sbArr = sbArr[:len(sbArr)-1] + // replace with new array + eb.subscribers.Store(subID, sbArr) + return + } + } + } +} + +// Send sends event to the events bus +func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + + eb.internalEvCh <- ev +} + +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + +func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { + eb.Lock() + defer eb.Unlock() + w, err := newWildcard(pattern) + if err != nil { + return err + } + + if sb, ok := eb.subscribers.Load(subID); ok { + // at this point we are confident that sb is a []*sub type + subArr := sb.([]*sub) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil + } + + subArr := make([]*sub, 0, 5) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil +} + +func (eb *eventsBus) handleEvents() { + for { //nolint:gosimple + select { + case ev := <-eb.internalEvCh: + // http.WorkerError for example + wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) + + eb.subscribers.Range(func(key, value interface{}) bool { + vsub := value.([]*sub) + + for i := 0; i < len(vsub); i++ { + if vsub[i].w.match(wc) { + select { + case vsub[i].events <- ev: + return true + default: + return true + } + } + } + + return true + }) + } + } +} diff --git a/events/general.go b/events/general.go deleted file mode 100755 index 5cf13e10..00000000 --- a/events/general.go +++ /dev/null @@ -1,41 +0,0 @@ -package events - -import ( - "sync" -) - -const UnknownEventType string = "Unknown event type" - -// HandlerImpl helps to broadcast events to multiple listeners. -type HandlerImpl struct { - listeners []Listener - sync.RWMutex // all receivers should be pointers -} - -func NewEventsHandler() Handler { - return &HandlerImpl{listeners: make([]Listener, 0, 2)} -} - -// NumListeners returns number of event listeners. -func (eb *HandlerImpl) NumListeners() int { - eb.Lock() - defer eb.Unlock() - return len(eb.listeners) -} - -// AddListener registers new event listener. -func (eb *HandlerImpl) AddListener(listener Listener) { - eb.Lock() - defer eb.Unlock() - eb.listeners = append(eb.listeners, listener) -} - -// Push broadcast events across all event listeners. -func (eb *HandlerImpl) Push(e interface{}) { - // ReadLock here because we are not changing listeners - eb.RLock() - defer eb.RUnlock() - for k := range eb.listeners { - eb.listeners[k](e) - } -} diff --git a/events/grpc_event.go b/events/grpc_event.go deleted file mode 100644 index 31ff4957..00000000 --- a/events/grpc_event.go +++ /dev/null @@ -1,39 +0,0 @@ -package events - -import ( - "time" - - "google.golang.org/grpc" -) - -const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk G = iota + 13000 - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr -) - -type G int64 - -func (ev G) String() string { - switch ev { - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - } - return UnknownEventType -} - -// JobEvent represent job event. -type GRPCEvent struct { - Event G - // Info contains unary call info. - Info *grpc.UnaryServerInfo - // Error associated with event. - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/init.go b/events/init.go new file mode 100644 index 00000000..25e92fc5 --- /dev/null +++ b/events/init.go @@ -0,0 +1,20 @@ +package events + +import ( + "sync" + + "github.com/google/uuid" +) + +var evBus *eventsBus +var onInit = &sync.Once{} + +func Bus() (*eventsBus, string) { + onInit.Do(func() { + evBus = newEventsBus() + go evBus.handleEvents() + }) + + // return events bus with subscriberID + return evBus, uuid.NewString() +} diff --git a/events/interface.go b/events/interface.go deleted file mode 100644 index 7d57e4d0..00000000 --- a/events/interface.go +++ /dev/null @@ -1,14 +0,0 @@ -package events - -// Handler interface -type Handler interface { - // NumListeners return number of active listeners - NumListeners() int - // AddListener adds lister to the publisher - AddListener(listener Listener) - // Push pushes event to the listeners - Push(e interface{}) -} - -// Listener .. (type alias) event listener listens for the events produced by worker, worker pool or other service. -type Listener func(event interface{}) diff --git a/events/jobs_events.go b/events/jobs_events.go deleted file mode 100644 index f65ede67..00000000 --- a/events/jobs_events.go +++ /dev/null @@ -1,81 +0,0 @@ -package events - -import ( - "time" -) - -const ( - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK J = iota + 12000 - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady -) - -type J int64 - -func (ev J) String() string { - switch ev { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - } - return UnknownEventType -} - -// JobEvent represent job event. -type JobEvent struct { - Event J - // String is job id. - ID string - // Pipeline name - Pipeline string - // Associated driver name (amqp, ephemeral, etc) - Driver string - // Error for the jobs/pipes errors - Error error - // event timings - Start time.Time - Elapsed time.Duration -} diff --git a/events/pool_events.go b/events/pool_events.go deleted file mode 100644 index eb28df6a..00000000 --- a/events/pool_events.go +++ /dev/null @@ -1,71 +0,0 @@ -package events - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct P = iota + 10000 - - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - - // EventSupervisorError triggered when supervisor can not complete work. - EventSupervisorError - - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - - // EventPoolRestart triggered when pool restart is needed - EventPoolRestart -) - -type P int64 - -func (ev P) String() string { - switch ev { - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventSupervisorError: - return "EventSupervisorError" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventPoolRestart: - return "EventPoolRestart" - } - return UnknownEventType -} - -// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. -type PoolEvent struct { - // Event type, see below. - Event P - - // Payload depends on event type, typically it's worker or error. - Payload interface{} - Error error -} diff --git a/events/types.go b/events/types.go new file mode 100644 index 00000000..65a76d15 --- /dev/null +++ b/events/types.go @@ -0,0 +1,46 @@ +package events + +type EventBus interface { + SubscribeAll(subID string, ch chan<- Event) error + SubscribeP(subID string, pattern string, ch chan<- Event) error + Unsubscribe(subID string) + UnsubscribeP(subID, pattern string) + Len() uint + Send(ev Event) +} + +type Event interface { + Plugin() string + Type() EventType + Message() string +} + +type RREvent struct { + // event typ + typ EventType + // plugin + plugin string + // message + message string +} + +// NewEvent initializes new event +func NewEvent(t EventType, plugin string, msg string) *RREvent { + return &RREvent{ + typ: t, + plugin: plugin, + message: msg, + } +} + +func (r *RREvent) Type() EventType { + return r.typ +} + +func (r *RREvent) Message() string { + return r.message +} + +func (r *RREvent) Plugin() string { + return r.plugin +} diff --git a/events/wildcard.go b/events/wildcard.go new file mode 100644 index 00000000..b4c28ae1 --- /dev/null +++ b/events/wildcard.go @@ -0,0 +1,43 @@ +package events + +import ( + "strings" + + "github.com/spiral/errors" +) + +type wildcard struct { + prefix string + suffix string +} + +func newWildcard(pattern string) (*wildcard, error) { + // Normalize + origin := strings.ToLower(pattern) + i := strings.IndexByte(origin, '*') + + /* + http.* + * + *.WorkerError + */ + if i == -1 { + dotI := strings.IndexByte(pattern, '.') + + if dotI == -1 { + // http.SuperEvent + return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*") + } + + return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil + } + + // pref: http. + // suff: * + return &wildcard{origin[0:i], origin[i+1:]}, nil +} + +func (w wildcard) match(s string) bool { + s = strings.ToLower(s) + return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) +} diff --git a/events/wildcard_test.go b/events/wildcard_test.go new file mode 100644 index 00000000..230ef673 --- /dev/null +++ b/events/wildcard_test.go @@ -0,0 +1,48 @@ +package events + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWildcard(t *testing.T) { + w, err := newWildcard("http.*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.True(t, w.match("http.****")) + + // *.* -> * + w, err = newWildcard("*") + assert.NoError(t, err) + assert.True(t, w.match("http.SuperEvent")) + assert.True(t, w.match("https.SuperEvent")) + assert.True(t, w.match("")) + assert.True(t, w.match("*")) + assert.True(t, w.match("****")) + assert.True(t, w.match("http.****")) + + w, err = newWildcard("*.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.WorkerError") + assert.NoError(t, err) + assert.False(t, w.match("http.SuperEvent")) + assert.False(t, w.match("https.SuperEvent")) + assert.False(t, w.match("")) + assert.False(t, w.match("*")) + assert.False(t, w.match("****")) + assert.False(t, w.match("http.****")) + assert.True(t, w.match("http.WorkerError")) +} diff --git a/events/worker_events.go b/events/worker_events.go deleted file mode 100644 index 6b80df61..00000000 --- a/events/worker_events.go +++ /dev/null @@ -1,40 +0,0 @@ -package events - -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit -) - -type W int64 - -func (ev W) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - } - return UnknownEventType -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event W - - // Worker triggered the event. - Worker interface{} - - // Event specific payload. - Payload interface{} -} |