summaryrefslogtreecommitdiff
path: root/events
diff options
context:
space:
mode:
Diffstat (limited to 'events')
-rw-r--r--events/docs/events.md136
-rw-r--r--events/events.go61
-rw-r--r--events/events_test.go218
-rw-r--r--events/eventsbus.go170
-rw-r--r--events/init.go20
-rw-r--r--events/types.go54
-rw-r--r--events/wildcard.go43
-rw-r--r--events/wildcard_test.go54
8 files changed, 0 insertions, 756 deletions
diff --git a/events/docs/events.md b/events/docs/events.md
deleted file mode 100644
index 37059b25..00000000
--- a/events/docs/events.md
+++ /dev/null
@@ -1,136 +0,0 @@
-## RoadRunner Events bus
-
-RR events bus might be useful when one plugin raises some event on which another plugin should react. For example,
-plugins like sentry might log errors from the `http` plugin.
-
-## Simple subscription
-
-Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event.
-
-Let's have a look at the simple example:
-
-```go
-package foo
-
-import (
- "github.com/spiral/roadrunner/v2/events"
-)
-
-func foo() {
- eh, id := events.Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan events.Event, 100)
- err := eh.SubscribeP(id, "http.EventJobOK", ch)
- if err != nil {
- panic(err)
- }
-
- eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
- evt := <-ch
- // evt.Message() -> "foo"
- // evt.Plugin() -> "http"
- // evt.Type().String() -> "EventJobOK"
-}
-```
-
-Here:
-1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`.
-2. `ch := make(chan events.Event, 100)` create an events channel.
-3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`).
-4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin.
-5. `evt := <-ch` get the event.
-
-Notes:
-1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`.
-
-## Wildcards
-
-As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`.
-Let's have a look at the next sample of code:
-
-```go
-package foo
-
-import (
- "github.com/spiral/roadrunner/v2/events"
-)
-
-func foo() {
- eh, id := events.Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan events.Event, 100)
- err := eh.SubscribeP(id, "http.*", ch)
- if err != nil {
- panic(err)
- }
-
- eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
- evt := <-ch
- // evt.Message() -> "foo"
- // evt.Plugin() -> "http"
- // evt.Type().String() -> "EventJobOK"
-}
-```
-
-One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`.
-
-You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type.
-
-
-## How to implement custom event
-
-Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that:
-
-```go
-package foo
-
-type MySuperEvent uint32
-
-const (
- EventHTTPError MySuperEvent = iota
-)
-
-func (mse MySuperEvent) String() string {
- switch mse {
- case EventHTTPError:
- return "EventHTTPError"
- default:
- return "UnknownEventType"
- }
-}
-```
-
-Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type.
-Next you need to create an enum with the actual events and that's it.
-
-How to use that:
-```go
-package foo
-
-import (
- "github.com/spiral/roadrunner/v2/events"
-)
-
-func foo() {
- eh, id := events.Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan events.Event, 100)
- err := eh.SubscribeP(id, "http.EventHTTPError", ch)
- if err != nil {
- panic(err)
- }
-
- // first arg of the NewEvent method is fmt.Stringer
- eh.Send(events.NewEvent(EventHTTPError, "http", "foo"))
- evt := <-ch
- // evt.Message() -> "foo"
- // evt.Plugin() -> "http"
- // evt.Type().String() -> "EventHTTPError"
-}
-```
-
-Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber.
-
diff --git a/events/events.go b/events/events.go
deleted file mode 100644
index 42519637..00000000
--- a/events/events.go
+++ /dev/null
@@ -1,61 +0,0 @@
-package events
-
-type EventType uint32
-
-const (
- // EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct EventType = iota
- // EventWorkerDestruct thrown after worker destruction.
- EventWorkerDestruct
- // EventWorkerProcessExit triggered on process wait exit
- EventWorkerProcessExit
- // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
- EventNoFreeWorkers
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory
- // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
- EventTTL
- // EventIdleTTL triggered when worker spends too much time at rest.
- EventIdleTTL
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
- EventExecTTL
- // EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError
- // EventWorkerStderr is the worker standard error output
- EventWorkerStderr
- // EventWorkerWaitExit is the worker exit event
- EventWorkerWaitExit
- // EventWorkerStopped triggered when worker gracefully stopped
- EventWorkerStopped
-)
-
-func (et EventType) String() string {
- switch et {
- case EventWorkerProcessExit:
- return "EventWorkerProcessExit"
- case EventWorkerConstruct:
- return "EventWorkerConstruct"
- case EventWorkerDestruct:
- return "EventWorkerDestruct"
- case EventNoFreeWorkers:
- return "EventNoFreeWorkers"
- case EventMaxMemory:
- return "EventMaxMemory"
- case EventTTL:
- return "EventTTL"
- case EventIdleTTL:
- return "EventIdleTTL"
- case EventExecTTL:
- return "EventExecTTL"
- case EventWorkerError:
- return "EventWorkerError"
- case EventWorkerStderr:
- return "EventWorkerStderr"
- case EventWorkerWaitExit:
- return "EventWorkerWaitExit"
- case EventWorkerStopped:
- return "EventWorkerStopped"
- default:
- return "UnknownEventType"
- }
-}
diff --git a/events/events_test.go b/events/events_test.go
deleted file mode 100644
index 62ddd903..00000000
--- a/events/events_test.go
+++ /dev/null
@@ -1,218 +0,0 @@
-package events
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/require"
-)
-
-func TestEvenHandler(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventWorkerError", ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventWorkerError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventWorkerError", evt.Type().String())
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler2(t *testing.T) {
- eh, id := Bus()
- eh2, id2 := Bus()
- defer eh.Unsubscribe(id)
- defer eh2.Unsubscribe(id2)
-
- ch := make(chan Event, 100)
- ch2 := make(chan Event, 100)
- err := eh2.SubscribeP(id2, "http.EventWorkerError", ch)
- require.NoError(t, err)
-
- err = eh.SubscribeP(id, "http.EventWorkerError", ch2)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventWorkerError, "http", "foo"))
-
- evt := <-ch2
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventWorkerError", evt.Type().String())
-
- l := eh.Len()
- require.Equal(t, uint(2), l)
-
- eh.Unsubscribe(id)
- time.Sleep(time.Second)
-
- l = eh.Len()
- require.Equal(t, uint(1), l)
-
- eh2.Unsubscribe(id2)
- time.Sleep(time.Second)
-
- l = eh.Len()
- require.Equal(t, uint(0), l)
-
- eh.Unsubscribe(id)
- eh2.Unsubscribe(id2)
-}
-
-func TestEvenHandler3(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "EventWorkerError", ch)
- require.Error(t, err)
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler4(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- err := eh.SubscribeP(id, "EventWorkerError", nil)
- require.Error(t, err)
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler5(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventWorkerError", ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventWorkerError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventWorkerError", evt.Type().String())
-
- eh.Unsubscribe(id)
-}
-
-type MySuperEvent uint32
-
-const (
- // EventHTTPError represents success unary call response
- EventHTTPError MySuperEvent = iota
-)
-
-func (mse MySuperEvent) String() string {
- switch mse {
- case EventHTTPError:
- return "EventHTTPError"
- default:
- return "UnknownEventType"
- }
-}
-
-func TestEvenHandler6(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventHTTPError", ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventHTTPError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventHTTPError", evt.Type().String())
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler7(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeAll(id, ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventHTTPError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventHTTPError", evt.Type().String())
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler8(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- err := eh.SubscribeAll(id, nil)
- require.Error(t, err)
-
- eh.Unsubscribe(id)
-}
-
-func TestEvenHandler9(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventWorkerError", ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventWorkerError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventWorkerError", evt.Type().String())
-
- eh.UnsubscribeP(id, "http.EventWorkerError")
-
- eh.Send(NewEvent(EventWorkerError, "http", "foo"))
-
- select {
- case <-ch:
- require.Fail(t, "should not read any events")
- default:
- return
- }
-}
-
-func TestEvenHandler10(t *testing.T) {
- eh, id := Bus()
- defer eh.Unsubscribe(id)
-
- ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventHTTPError", ch)
- require.NoError(t, err)
- err = eh.SubscribeP(id, "http.Foo", ch)
- require.NoError(t, err)
- err = eh.SubscribeP(id, "http.Foo2", ch)
- require.NoError(t, err)
- err = eh.SubscribeP(id, "http.Foo2", ch)
- require.NoError(t, err)
-
- eh.Send(NewEvent(EventHTTPError, "http", "foo"))
-
- evt := <-ch
- require.Equal(t, "foo", evt.Message())
- require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventHTTPError", evt.Type().String())
-
- eh.Unsubscribe(id)
-}
diff --git a/events/eventsbus.go b/events/eventsbus.go
deleted file mode 100644
index a2a2c859..00000000
--- a/events/eventsbus.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package events
-
-import (
- "fmt"
- "strings"
- "sync"
-
- "github.com/spiral/errors"
-)
-
-type sub struct {
- pattern string
- w *wildcard
- events chan<- Event
-}
-
-type eventsBus struct {
- sync.RWMutex
- subscribers sync.Map
- internalEvCh chan Event
- stop chan struct{}
-}
-
-func newEventsBus() *eventsBus {
- return &eventsBus{
- internalEvCh: make(chan Event, 100),
- stop: make(chan struct{}),
- }
-}
-
-/*
-http.* <-
-*/
-
-// SubscribeAll for all RR events
-// returns subscriptionID
-func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
- if ch == nil {
- return errors.Str("nil channel provided")
- }
-
- subIDTr := strings.Trim(subID, " ")
-
- if subIDTr == "" {
- return errors.Str("subscriberID can't be empty")
- }
-
- return eb.subscribe(subID, "*", ch)
-}
-
-// SubscribeP pattern like "pluginName.EventType"
-func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
- if ch == nil {
- return errors.Str("nil channel provided")
- }
-
- subIDTr := strings.Trim(subID, " ")
- patternTr := strings.Trim(pattern, " ")
-
- if subIDTr == "" || patternTr == "" {
- return errors.Str("subscriberID or pattern can't be empty")
- }
-
- return eb.subscribe(subID, pattern, ch)
-}
-
-func (eb *eventsBus) Unsubscribe(subID string) {
- eb.subscribers.Delete(subID)
-}
-
-func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
- if sb, ok := eb.subscribers.Load(subID); ok {
- eb.Lock()
- defer eb.Unlock()
-
- sbArr := sb.([]*sub)
-
- for i := 0; i < len(sbArr); i++ {
- if sbArr[i].pattern == pattern {
- sbArr[i] = sbArr[len(sbArr)-1]
- sbArr = sbArr[:len(sbArr)-1]
- // replace with new array
- eb.subscribers.Store(subID, sbArr)
- return
- }
- }
- }
-}
-
-// Send sends event to the events bus
-func (eb *eventsBus) Send(ev Event) {
- // do not accept nil events
- if ev == nil {
- return
- }
-
- eb.internalEvCh <- ev
-}
-
-func (eb *eventsBus) Len() uint {
- var ln uint
-
- eb.subscribers.Range(func(key, value interface{}) bool {
- ln++
- return true
- })
-
- return ln
-}
-
-func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
- eb.Lock()
- defer eb.Unlock()
- w, err := newWildcard(pattern)
- if err != nil {
- return err
- }
-
- if sb, ok := eb.subscribers.Load(subID); ok {
- // at this point we are confident that sb is a []*sub type
- subArr := sb.([]*sub)
- subArr = append(subArr, &sub{
- pattern: pattern,
- w: w,
- events: ch,
- })
-
- eb.subscribers.Store(subID, subArr)
-
- return nil
- }
-
- subArr := make([]*sub, 0, 1)
- subArr = append(subArr, &sub{
- pattern: pattern,
- w: w,
- events: ch,
- })
-
- eb.subscribers.Store(subID, subArr)
-
- return nil
-}
-
-func (eb *eventsBus) handleEvents() {
- for { //nolint:gosimple
- select {
- case ev := <-eb.internalEvCh:
- // http.WorkerError for example
- wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
-
- eb.subscribers.Range(func(key, value interface{}) bool {
- vsub := value.([]*sub)
-
- for i := 0; i < len(vsub); i++ {
- if vsub[i].w.match(wc) {
- select {
- case vsub[i].events <- ev:
- return true
- default:
- return true
- }
- }
- }
-
- return true
- })
- }
- }
-}
diff --git a/events/init.go b/events/init.go
deleted file mode 100644
index 25e92fc5..00000000
--- a/events/init.go
+++ /dev/null
@@ -1,20 +0,0 @@
-package events
-
-import (
- "sync"
-
- "github.com/google/uuid"
-)
-
-var evBus *eventsBus
-var onInit = &sync.Once{}
-
-func Bus() (*eventsBus, string) {
- onInit.Do(func() {
- evBus = newEventsBus()
- go evBus.handleEvents()
- })
-
- // return events bus with subscriberID
- return evBus, uuid.NewString()
-}
diff --git a/events/types.go b/events/types.go
deleted file mode 100644
index 806e81ce..00000000
--- a/events/types.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package events
-
-import (
- "fmt"
-)
-
-type EventBus interface {
- SubscribeAll(subID string, ch chan<- Event) error
- SubscribeP(subID string, pattern string, ch chan<- Event) error
- Unsubscribe(subID string)
- UnsubscribeP(subID, pattern string)
- Len() uint
- Send(ev Event)
-}
-
-type Event interface {
- Type() fmt.Stringer
- Plugin() string
- Message() string
-}
-
-type event struct {
- // event typ
- typ fmt.Stringer
- // plugin
- plugin string
- // message
- message string
-}
-
-// NewEvent initializes new event
-func NewEvent(t fmt.Stringer, plugin string, message string) *event {
- if t.String() == "" || plugin == "" {
- return nil
- }
-
- return &event{
- typ: t,
- plugin: plugin,
- message: message,
- }
-}
-
-func (r *event) Type() fmt.Stringer {
- return r.typ
-}
-
-func (r *event) Message() string {
- return r.message
-}
-
-func (r *event) Plugin() string {
- return r.plugin
-}
diff --git a/events/wildcard.go b/events/wildcard.go
deleted file mode 100644
index b4c28ae1..00000000
--- a/events/wildcard.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package events
-
-import (
- "strings"
-
- "github.com/spiral/errors"
-)
-
-type wildcard struct {
- prefix string
- suffix string
-}
-
-func newWildcard(pattern string) (*wildcard, error) {
- // Normalize
- origin := strings.ToLower(pattern)
- i := strings.IndexByte(origin, '*')
-
- /*
- http.*
- *
- *.WorkerError
- */
- if i == -1 {
- dotI := strings.IndexByte(pattern, '.')
-
- if dotI == -1 {
- // http.SuperEvent
- return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*")
- }
-
- return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil
- }
-
- // pref: http.
- // suff: *
- return &wildcard{origin[0:i], origin[i+1:]}, nil
-}
-
-func (w wildcard) match(s string) bool {
- s = strings.ToLower(s)
- return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
-}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
deleted file mode 100644
index dfa65e63..00000000
--- a/events/wildcard_test.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package events
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestWildcard(t *testing.T) {
- w, err := newWildcard("http.*")
- assert.NoError(t, err)
- assert.True(t, w.match("http.SuperEvent"))
- assert.False(t, w.match("https.SuperEvent"))
- assert.False(t, w.match(""))
- assert.False(t, w.match("*"))
- assert.False(t, w.match("****"))
- assert.True(t, w.match("http.****"))
-
- // *.* -> *
- w, err = newWildcard("*")
- assert.NoError(t, err)
- assert.True(t, w.match("http.SuperEvent"))
- assert.True(t, w.match("https.SuperEvent"))
- assert.True(t, w.match(""))
- assert.True(t, w.match("*"))
- assert.True(t, w.match("****"))
- assert.True(t, w.match("http.****"))
-
- w, err = newWildcard("*.WorkerError")
- assert.NoError(t, err)
- assert.False(t, w.match("http.SuperEvent"))
- assert.False(t, w.match("https.SuperEvent"))
- assert.False(t, w.match(""))
- assert.False(t, w.match("*"))
- assert.False(t, w.match("****"))
- assert.False(t, w.match("http.****"))
- assert.True(t, w.match("http.WorkerError"))
-
- w, err = newWildcard("http.WorkerError")
- assert.NoError(t, err)
- assert.False(t, w.match("http.SuperEvent"))
- assert.False(t, w.match("https.SuperEvent"))
- assert.False(t, w.match(""))
- assert.False(t, w.match("*"))
- assert.False(t, w.match("****"))
- assert.False(t, w.match("http.****"))
- assert.True(t, w.match("http.WorkerError"))
-
- w, err = newWildcard("http.Worker*")
- assert.NoError(t, err)
- assert.True(t, w.match("http.WorkerFoo"))
- assert.False(t, w.match("h*.SuperEvent"))
- assert.False(t, w.match("h*.Worker"))
-}