diff options
-rw-r--r-- | events/docs/events.md | 136 | ||||
-rw-r--r-- | events/events_test.go | 32 | ||||
-rw-r--r-- | events/types.go | 26 | ||||
-rw-r--r-- | events/wildcard_test.go | 6 | ||||
-rwxr-xr-x | pool/static_pool.go | 1 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 2 |
6 files changed, 191 insertions, 12 deletions
diff --git a/events/docs/events.md b/events/docs/events.md index e69de29b..37059b25 100644 --- a/events/docs/events.md +++ b/events/docs/events.md @@ -0,0 +1,136 @@ +## RoadRunner Events bus + +RR events bus might be useful when one plugin raises some event on which another plugin should react. For example, +plugins like sentry might log errors from the `http` plugin. + +## Simple subscription + +Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event. + +Let's have a look at the simple example: + +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + if err != nil { + panic(err) + } + + eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventJobOK" +} +``` + +Here: +1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`. +2. `ch := make(chan events.Event, 100)` create an events channel. +3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`). +4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin. +5. `evt := <-ch` get the event. + +Notes: +1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`. + +## Wildcards + +As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`. +Let's have a look at the next sample of code: + +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.*", ch) + if err != nil { + panic(err) + } + + eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventJobOK" +} +``` + +One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`. + +You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type. + + +## How to implement custom event + +Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that: + +```go +package foo + +type MySuperEvent uint32 + +const ( + EventHTTPError MySuperEvent = iota +) + +func (mse MySuperEvent) String() string { + switch mse { + case EventHTTPError: + return "EventHTTPError" + default: + return "UnknownEventType" + } +} +``` + +Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type. +Next you need to create an enum with the actual events and that's it. + +How to use that: +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.EventHTTPError", ch) + if err != nil { + panic(err) + } + + // first arg of the NewEvent method is fmt.Stringer + eh.Send(events.NewEvent(EventHTTPError, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventHTTPError" +} +``` + +Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber. + diff --git a/events/events_test.go b/events/events_test.go index e15c55d6..fd8a8a05 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -92,3 +92,35 @@ func TestEvenHandler5(t *testing.T) { require.Equal(t, "http", evt.Plugin()) require.Equal(t, "EventJobOK", evt.Type().String()) } + +type MySuperEvent uint32 + +const ( + // EventHTTPError represents success unary call response + EventHTTPError MySuperEvent = iota +) + +func (mse MySuperEvent) String() string { + switch mse { + case EventHTTPError: + return "EventHTTPError" + default: + return "UnknownEventType" + } +} + +func TestEvenHandler6(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventHTTPError", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventHTTPError, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventHTTPError", evt.Type().String()) +} diff --git a/events/types.go b/events/types.go index 65a76d15..806e81ce 100644 --- a/events/types.go +++ b/events/types.go @@ -1,5 +1,9 @@ package events +import ( + "fmt" +) + type EventBus interface { SubscribeAll(subID string, ch chan<- Event) error SubscribeP(subID string, pattern string, ch chan<- Event) error @@ -10,14 +14,14 @@ type EventBus interface { } type Event interface { + Type() fmt.Stringer Plugin() string - Type() EventType Message() string } -type RREvent struct { +type event struct { // event typ - typ EventType + typ fmt.Stringer // plugin plugin string // message @@ -25,22 +29,26 @@ type RREvent struct { } // NewEvent initializes new event -func NewEvent(t EventType, plugin string, msg string) *RREvent { - return &RREvent{ +func NewEvent(t fmt.Stringer, plugin string, message string) *event { + if t.String() == "" || plugin == "" { + return nil + } + + return &event{ typ: t, plugin: plugin, - message: msg, + message: message, } } -func (r *RREvent) Type() EventType { +func (r *event) Type() fmt.Stringer { return r.typ } -func (r *RREvent) Message() string { +func (r *event) Message() string { return r.message } -func (r *RREvent) Plugin() string { +func (r *event) Plugin() string { return r.plugin } diff --git a/events/wildcard_test.go b/events/wildcard_test.go index 230ef673..dfa65e63 100644 --- a/events/wildcard_test.go +++ b/events/wildcard_test.go @@ -45,4 +45,10 @@ func TestWildcard(t *testing.T) { assert.False(t, w.match("****")) assert.False(t, w.match("http.****")) assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.Worker*") + assert.NoError(t, err) + assert.True(t, w.match("http.WorkerFoo")) + assert.False(t, w.match("h*.SuperEvent")) + assert.False(t, w.match("h*.Worker")) } diff --git a/pool/static_pool.go b/pool/static_pool.go index 11112e72..a5c98eaa 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -200,7 +200,6 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -//go:inline func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index 0f527cd5..848676cf 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -122,7 +122,6 @@ func Test_Pipe_PipeError2(t *testing.T) { } func Test_Pipe_Failboot(t *testing.T) { - t.Parallel() cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() @@ -430,7 +429,6 @@ func Test_Echo_Slow(t *testing.T) { } func Test_Broken(t *testing.T) { - t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") |