diff options
Diffstat (limited to 'events')
-rw-r--r-- | events/docs/events.md | 136 | ||||
-rw-r--r-- | events/events.go | 61 | ||||
-rw-r--r-- | events/events_test.go | 218 | ||||
-rw-r--r-- | events/eventsbus.go | 170 | ||||
-rw-r--r-- | events/init.go | 20 | ||||
-rw-r--r-- | events/types.go | 54 | ||||
-rw-r--r-- | events/wildcard.go | 43 | ||||
-rw-r--r-- | events/wildcard_test.go | 54 |
8 files changed, 0 insertions, 756 deletions
diff --git a/events/docs/events.md b/events/docs/events.md deleted file mode 100644 index 37059b25..00000000 --- a/events/docs/events.md +++ /dev/null @@ -1,136 +0,0 @@ -## RoadRunner Events bus - -RR events bus might be useful when one plugin raises some event on which another plugin should react. For example, -plugins like sentry might log errors from the `http` plugin. - -## Simple subscription - -Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event. - -Let's have a look at the simple example: - -```go -package foo - -import ( - "github.com/spiral/roadrunner/v2/events" -) - -func foo() { - eh, id := events.Bus() - defer eh.Unsubscribe(id) - - ch := make(chan events.Event, 100) - err := eh.SubscribeP(id, "http.EventJobOK", ch) - if err != nil { - panic(err) - } - - eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) - evt := <-ch - // evt.Message() -> "foo" - // evt.Plugin() -> "http" - // evt.Type().String() -> "EventJobOK" -} -``` - -Here: -1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`. -2. `ch := make(chan events.Event, 100)` create an events channel. -3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`). -4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin. -5. `evt := <-ch` get the event. - -Notes: -1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`. - -## Wildcards - -As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`. -Let's have a look at the next sample of code: - -```go -package foo - -import ( - "github.com/spiral/roadrunner/v2/events" -) - -func foo() { - eh, id := events.Bus() - defer eh.Unsubscribe(id) - - ch := make(chan events.Event, 100) - err := eh.SubscribeP(id, "http.*", ch) - if err != nil { - panic(err) - } - - eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) - evt := <-ch - // evt.Message() -> "foo" - // evt.Plugin() -> "http" - // evt.Type().String() -> "EventJobOK" -} -``` - -One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`. - -You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type. - - -## How to implement custom event - -Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that: - -```go -package foo - -type MySuperEvent uint32 - -const ( - EventHTTPError MySuperEvent = iota -) - -func (mse MySuperEvent) String() string { - switch mse { - case EventHTTPError: - return "EventHTTPError" - default: - return "UnknownEventType" - } -} -``` - -Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type. -Next you need to create an enum with the actual events and that's it. - -How to use that: -```go -package foo - -import ( - "github.com/spiral/roadrunner/v2/events" -) - -func foo() { - eh, id := events.Bus() - defer eh.Unsubscribe(id) - - ch := make(chan events.Event, 100) - err := eh.SubscribeP(id, "http.EventHTTPError", ch) - if err != nil { - panic(err) - } - - // first arg of the NewEvent method is fmt.Stringer - eh.Send(events.NewEvent(EventHTTPError, "http", "foo")) - evt := <-ch - // evt.Message() -> "foo" - // evt.Plugin() -> "http" - // evt.Type().String() -> "EventHTTPError" -} -``` - -Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber. - diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 42519637..00000000 --- a/events/events.go +++ /dev/null @@ -1,61 +0,0 @@ -package events - -type EventType uint32 - -const ( - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct EventType = iota - // EventWorkerDestruct thrown after worker destruction. - EventWorkerDestruct - // EventWorkerProcessExit triggered on process wait exit - EventWorkerProcessExit - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed - EventNoFreeWorkers - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) - EventTTL - // EventIdleTTL triggered when worker spends too much time at rest. - EventIdleTTL - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). - EventExecTTL - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError - // EventWorkerStderr is the worker standard error output - EventWorkerStderr - // EventWorkerWaitExit is the worker exit event - EventWorkerWaitExit - // EventWorkerStopped triggered when worker gracefully stopped - EventWorkerStopped -) - -func (et EventType) String() string { - switch et { - case EventWorkerProcessExit: - return "EventWorkerProcessExit" - case EventWorkerConstruct: - return "EventWorkerConstruct" - case EventWorkerDestruct: - return "EventWorkerDestruct" - case EventNoFreeWorkers: - return "EventNoFreeWorkers" - case EventMaxMemory: - return "EventMaxMemory" - case EventTTL: - return "EventTTL" - case EventIdleTTL: - return "EventIdleTTL" - case EventExecTTL: - return "EventExecTTL" - case EventWorkerError: - return "EventWorkerError" - case EventWorkerStderr: - return "EventWorkerStderr" - case EventWorkerWaitExit: - return "EventWorkerWaitExit" - case EventWorkerStopped: - return "EventWorkerStopped" - default: - return "UnknownEventType" - } -} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index 62ddd903..00000000 --- a/events/events_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package events - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestEvenHandler(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventWorkerError", ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventWorkerError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventWorkerError", evt.Type().String()) - - eh.Unsubscribe(id) -} - -func TestEvenHandler2(t *testing.T) { - eh, id := Bus() - eh2, id2 := Bus() - defer eh.Unsubscribe(id) - defer eh2.Unsubscribe(id2) - - ch := make(chan Event, 100) - ch2 := make(chan Event, 100) - err := eh2.SubscribeP(id2, "http.EventWorkerError", ch) - require.NoError(t, err) - - err = eh.SubscribeP(id, "http.EventWorkerError", ch2) - require.NoError(t, err) - - eh.Send(NewEvent(EventWorkerError, "http", "foo")) - - evt := <-ch2 - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventWorkerError", evt.Type().String()) - - l := eh.Len() - require.Equal(t, uint(2), l) - - eh.Unsubscribe(id) - time.Sleep(time.Second) - - l = eh.Len() - require.Equal(t, uint(1), l) - - eh2.Unsubscribe(id2) - time.Sleep(time.Second) - - l = eh.Len() - require.Equal(t, uint(0), l) - - eh.Unsubscribe(id) - eh2.Unsubscribe(id2) -} - -func TestEvenHandler3(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "EventWorkerError", ch) - require.Error(t, err) - - eh.Unsubscribe(id) -} - -func TestEvenHandler4(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - err := eh.SubscribeP(id, "EventWorkerError", nil) - require.Error(t, err) - - eh.Unsubscribe(id) -} - -func TestEvenHandler5(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventWorkerError", ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventWorkerError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventWorkerError", evt.Type().String()) - - eh.Unsubscribe(id) -} - -type MySuperEvent uint32 - -const ( - // EventHTTPError represents success unary call response - EventHTTPError MySuperEvent = iota -) - -func (mse MySuperEvent) String() string { - switch mse { - case EventHTTPError: - return "EventHTTPError" - default: - return "UnknownEventType" - } -} - -func TestEvenHandler6(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventHTTPError", ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventHTTPError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventHTTPError", evt.Type().String()) - - eh.Unsubscribe(id) -} - -func TestEvenHandler7(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeAll(id, ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventHTTPError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventHTTPError", evt.Type().String()) - - eh.Unsubscribe(id) -} - -func TestEvenHandler8(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - err := eh.SubscribeAll(id, nil) - require.Error(t, err) - - eh.Unsubscribe(id) -} - -func TestEvenHandler9(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventWorkerError", ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventWorkerError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventWorkerError", evt.Type().String()) - - eh.UnsubscribeP(id, "http.EventWorkerError") - - eh.Send(NewEvent(EventWorkerError, "http", "foo")) - - select { - case <-ch: - require.Fail(t, "should not read any events") - default: - return - } -} - -func TestEvenHandler10(t *testing.T) { - eh, id := Bus() - defer eh.Unsubscribe(id) - - ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventHTTPError", ch) - require.NoError(t, err) - err = eh.SubscribeP(id, "http.Foo", ch) - require.NoError(t, err) - err = eh.SubscribeP(id, "http.Foo2", ch) - require.NoError(t, err) - err = eh.SubscribeP(id, "http.Foo2", ch) - require.NoError(t, err) - - eh.Send(NewEvent(EventHTTPError, "http", "foo")) - - evt := <-ch - require.Equal(t, "foo", evt.Message()) - require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventHTTPError", evt.Type().String()) - - eh.Unsubscribe(id) -} diff --git a/events/eventsbus.go b/events/eventsbus.go deleted file mode 100644 index a2a2c859..00000000 --- a/events/eventsbus.go +++ /dev/null @@ -1,170 +0,0 @@ -package events - -import ( - "fmt" - "strings" - "sync" - - "github.com/spiral/errors" -) - -type sub struct { - pattern string - w *wildcard - events chan<- Event -} - -type eventsBus struct { - sync.RWMutex - subscribers sync.Map - internalEvCh chan Event - stop chan struct{} -} - -func newEventsBus() *eventsBus { - return &eventsBus{ - internalEvCh: make(chan Event, 100), - stop: make(chan struct{}), - } -} - -/* -http.* <- -*/ - -// SubscribeAll for all RR events -// returns subscriptionID -func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { - if ch == nil { - return errors.Str("nil channel provided") - } - - subIDTr := strings.Trim(subID, " ") - - if subIDTr == "" { - return errors.Str("subscriberID can't be empty") - } - - return eb.subscribe(subID, "*", ch) -} - -// SubscribeP pattern like "pluginName.EventType" -func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { - if ch == nil { - return errors.Str("nil channel provided") - } - - subIDTr := strings.Trim(subID, " ") - patternTr := strings.Trim(pattern, " ") - - if subIDTr == "" || patternTr == "" { - return errors.Str("subscriberID or pattern can't be empty") - } - - return eb.subscribe(subID, pattern, ch) -} - -func (eb *eventsBus) Unsubscribe(subID string) { - eb.subscribers.Delete(subID) -} - -func (eb *eventsBus) UnsubscribeP(subID, pattern string) { - if sb, ok := eb.subscribers.Load(subID); ok { - eb.Lock() - defer eb.Unlock() - - sbArr := sb.([]*sub) - - for i := 0; i < len(sbArr); i++ { - if sbArr[i].pattern == pattern { - sbArr[i] = sbArr[len(sbArr)-1] - sbArr = sbArr[:len(sbArr)-1] - // replace with new array - eb.subscribers.Store(subID, sbArr) - return - } - } - } -} - -// Send sends event to the events bus -func (eb *eventsBus) Send(ev Event) { - // do not accept nil events - if ev == nil { - return - } - - eb.internalEvCh <- ev -} - -func (eb *eventsBus) Len() uint { - var ln uint - - eb.subscribers.Range(func(key, value interface{}) bool { - ln++ - return true - }) - - return ln -} - -func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { - eb.Lock() - defer eb.Unlock() - w, err := newWildcard(pattern) - if err != nil { - return err - } - - if sb, ok := eb.subscribers.Load(subID); ok { - // at this point we are confident that sb is a []*sub type - subArr := sb.([]*sub) - subArr = append(subArr, &sub{ - pattern: pattern, - w: w, - events: ch, - }) - - eb.subscribers.Store(subID, subArr) - - return nil - } - - subArr := make([]*sub, 0, 1) - subArr = append(subArr, &sub{ - pattern: pattern, - w: w, - events: ch, - }) - - eb.subscribers.Store(subID, subArr) - - return nil -} - -func (eb *eventsBus) handleEvents() { - for { //nolint:gosimple - select { - case ev := <-eb.internalEvCh: - // http.WorkerError for example - wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) - - eb.subscribers.Range(func(key, value interface{}) bool { - vsub := value.([]*sub) - - for i := 0; i < len(vsub); i++ { - if vsub[i].w.match(wc) { - select { - case vsub[i].events <- ev: - return true - default: - return true - } - } - } - - return true - }) - } - } -} diff --git a/events/init.go b/events/init.go deleted file mode 100644 index 25e92fc5..00000000 --- a/events/init.go +++ /dev/null @@ -1,20 +0,0 @@ -package events - -import ( - "sync" - - "github.com/google/uuid" -) - -var evBus *eventsBus -var onInit = &sync.Once{} - -func Bus() (*eventsBus, string) { - onInit.Do(func() { - evBus = newEventsBus() - go evBus.handleEvents() - }) - - // return events bus with subscriberID - return evBus, uuid.NewString() -} diff --git a/events/types.go b/events/types.go deleted file mode 100644 index 806e81ce..00000000 --- a/events/types.go +++ /dev/null @@ -1,54 +0,0 @@ -package events - -import ( - "fmt" -) - -type EventBus interface { - SubscribeAll(subID string, ch chan<- Event) error - SubscribeP(subID string, pattern string, ch chan<- Event) error - Unsubscribe(subID string) - UnsubscribeP(subID, pattern string) - Len() uint - Send(ev Event) -} - -type Event interface { - Type() fmt.Stringer - Plugin() string - Message() string -} - -type event struct { - // event typ - typ fmt.Stringer - // plugin - plugin string - // message - message string -} - -// NewEvent initializes new event -func NewEvent(t fmt.Stringer, plugin string, message string) *event { - if t.String() == "" || plugin == "" { - return nil - } - - return &event{ - typ: t, - plugin: plugin, - message: message, - } -} - -func (r *event) Type() fmt.Stringer { - return r.typ -} - -func (r *event) Message() string { - return r.message -} - -func (r *event) Plugin() string { - return r.plugin -} diff --git a/events/wildcard.go b/events/wildcard.go deleted file mode 100644 index b4c28ae1..00000000 --- a/events/wildcard.go +++ /dev/null @@ -1,43 +0,0 @@ -package events - -import ( - "strings" - - "github.com/spiral/errors" -) - -type wildcard struct { - prefix string - suffix string -} - -func newWildcard(pattern string) (*wildcard, error) { - // Normalize - origin := strings.ToLower(pattern) - i := strings.IndexByte(origin, '*') - - /* - http.* - * - *.WorkerError - */ - if i == -1 { - dotI := strings.IndexByte(pattern, '.') - - if dotI == -1 { - // http.SuperEvent - return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*") - } - - return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil - } - - // pref: http. - // suff: * - return &wildcard{origin[0:i], origin[i+1:]}, nil -} - -func (w wildcard) match(s string) bool { - s = strings.ToLower(s) - return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) -} diff --git a/events/wildcard_test.go b/events/wildcard_test.go deleted file mode 100644 index dfa65e63..00000000 --- a/events/wildcard_test.go +++ /dev/null @@ -1,54 +0,0 @@ -package events - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestWildcard(t *testing.T) { - w, err := newWildcard("http.*") - assert.NoError(t, err) - assert.True(t, w.match("http.SuperEvent")) - assert.False(t, w.match("https.SuperEvent")) - assert.False(t, w.match("")) - assert.False(t, w.match("*")) - assert.False(t, w.match("****")) - assert.True(t, w.match("http.****")) - - // *.* -> * - w, err = newWildcard("*") - assert.NoError(t, err) - assert.True(t, w.match("http.SuperEvent")) - assert.True(t, w.match("https.SuperEvent")) - assert.True(t, w.match("")) - assert.True(t, w.match("*")) - assert.True(t, w.match("****")) - assert.True(t, w.match("http.****")) - - w, err = newWildcard("*.WorkerError") - assert.NoError(t, err) - assert.False(t, w.match("http.SuperEvent")) - assert.False(t, w.match("https.SuperEvent")) - assert.False(t, w.match("")) - assert.False(t, w.match("*")) - assert.False(t, w.match("****")) - assert.False(t, w.match("http.****")) - assert.True(t, w.match("http.WorkerError")) - - w, err = newWildcard("http.WorkerError") - assert.NoError(t, err) - assert.False(t, w.match("http.SuperEvent")) - assert.False(t, w.match("https.SuperEvent")) - assert.False(t, w.match("")) - assert.False(t, w.match("*")) - assert.False(t, w.match("****")) - assert.False(t, w.match("http.****")) - assert.True(t, w.match("http.WorkerError")) - - w, err = newWildcard("http.Worker*") - assert.NoError(t, err) - assert.True(t, w.match("http.WorkerFoo")) - assert.False(t, w.match("h*.SuperEvent")) - assert.False(t, w.match("h*.Worker")) -} |