diff options
author | Valery Piashchynski <[email protected]> | 2021-10-30 13:33:06 -0700 |
---|---|---|
committer | GitHub <[email protected]> | 2021-10-30 13:33:06 -0700 |
commit | b78558d0af7f813f81de0338400f99291932347f (patch) | |
tree | e71fcd3ed2d8bbc1bb4542290a11d5cb9730a4b1 | |
parent | c8c3f9f113eae13aa37cf92043b288bb0c68a622 (diff) | |
parent | 1cac0d3efd40e8950004c8d953fb5d1f2d53197b (diff) |
[#839]: refactoring(eventsbus): add docs, udpate testsv2.6.0-alpha.2
[#839]: refactoring(eventsbus): add docs, udpate tests
-rw-r--r-- | events/docs/events.md | 136 | ||||
-rw-r--r-- | events/events.go | 75 | ||||
-rw-r--r-- | events/events_test.go | 56 | ||||
-rw-r--r-- | events/types.go | 26 | ||||
-rw-r--r-- | events/wildcard_test.go | 6 | ||||
-rwxr-xr-x | pool/static_pool.go | 1 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 2 |
7 files changed, 204 insertions, 98 deletions
diff --git a/events/docs/events.md b/events/docs/events.md index e69de29b..37059b25 100644 --- a/events/docs/events.md +++ b/events/docs/events.md @@ -0,0 +1,136 @@ +## RoadRunner Events bus + +RR events bus might be useful when one plugin raises some event on which another plugin should react. For example, +plugins like sentry might log errors from the `http` plugin. + +## Simple subscription + +Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event. + +Let's have a look at the simple example: + +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + if err != nil { + panic(err) + } + + eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventJobOK" +} +``` + +Here: +1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`. +2. `ch := make(chan events.Event, 100)` create an events channel. +3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`). +4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin. +5. `evt := <-ch` get the event. + +Notes: +1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`. + +## Wildcards + +As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`. +Let's have a look at the next sample of code: + +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.*", ch) + if err != nil { + panic(err) + } + + eh.Send(events.NewEvent(events.EventJobOK, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventJobOK" +} +``` + +One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`. + +You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type. + + +## How to implement custom event + +Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that: + +```go +package foo + +type MySuperEvent uint32 + +const ( + EventHTTPError MySuperEvent = iota +) + +func (mse MySuperEvent) String() string { + switch mse { + case EventHTTPError: + return "EventHTTPError" + default: + return "UnknownEventType" + } +} +``` + +Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type. +Next you need to create an enum with the actual events and that's it. + +How to use that: +```go +package foo + +import ( + "github.com/spiral/roadrunner/v2/events" +) + +func foo() { + eh, id := events.Bus() + defer eh.Unsubscribe(id) + + ch := make(chan events.Event, 100) + err := eh.SubscribeP(id, "http.EventHTTPError", ch) + if err != nil { + panic(err) + } + + // first arg of the NewEvent method is fmt.Stringer + eh.Send(events.NewEvent(EventHTTPError, "http", "foo")) + evt := <-ch + // evt.Message() -> "foo" + // evt.Plugin() -> "http" + // evt.Type().String() -> "EventHTTPError" +} +``` + +Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber. + diff --git a/events/events.go b/events/events.go index b7396653..5a417e7f 100644 --- a/events/events.go +++ b/events/events.go @@ -3,72 +3,26 @@ package events type EventType uint32 const ( - // EventUnaryCallOk represents success unary call response - EventUnaryCallOk EventType = iota - - // EventUnaryCallErr raised when unary call ended with error - EventUnaryCallErr - - // EventPushOK thrown when new job has been added. JobEvent is passed as context. - EventPushOK - - // EventPushError caused when job can not be registered. - EventPushError - - // EventJobStart thrown when new job received. - EventJobStart - - // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context. - EventJobOK - - // EventJobError thrown on all job related errors. See JobError as context. - EventJobError - - // EventPipeActive when pipeline has started. - EventPipeActive - - // EventPipeStopped when pipeline has been stopped. - EventPipeStopped - - // EventPipePaused when pipeline has been paused. - EventPipePaused - - // EventPipeError when pipeline specific error happen. - EventPipeError - - // EventDriverReady thrown when broken is ready to accept/serve tasks. - EventDriverReady - // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct - + EventWorkerConstruct EventType = iota // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct - // EventSupervisorError triggered when supervisor can not complete work. EventSupervisorError - // EventWorkerProcessExit triggered on process wait exit EventWorkerProcessExit - // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed EventNoFreeWorkers - // EventMaxMemory caused when worker consumes more memory than allowed. EventMaxMemory - // EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds) EventTTL - // EventIdleTTL triggered when worker spends too much time at rest. EventIdleTTL - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL - // EventPoolRestart triggered when pool restart is needed EventPoolRestart - // EventWorkerError triggered after WorkerProcess. Except payload to be error. EventWorkerError // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. @@ -81,32 +35,6 @@ const ( func (et EventType) String() string { switch et { - case EventPushOK: - return "EventPushOK" - case EventPushError: - return "EventPushError" - case EventJobStart: - return "EventJobStart" - case EventJobOK: - return "EventJobOK" - case EventJobError: - return "EventJobError" - case EventPipeActive: - return "EventPipeActive" - case EventPipeStopped: - return "EventPipeStopped" - case EventPipeError: - return "EventPipeError" - case EventDriverReady: - return "EventDriverReady" - case EventPipePaused: - return "EventPipePaused" - - case EventUnaryCallOk: - return "EventUnaryCallOk" - case EventUnaryCallErr: - return "EventUnaryCallErr" - case EventWorkerProcessExit: return "EventWorkerProcessExit" case EventWorkerConstruct: @@ -127,7 +55,6 @@ func (et EventType) String() string { return "EventExecTTL" case EventPoolRestart: return "EventPoolRestart" - case EventWorkerError: return "EventWorkerError" case EventWorkerLog: diff --git a/events/events_test.go b/events/events_test.go index e15c55d6..f7cb4205 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -12,15 +12,15 @@ func TestEvenHandler(t *testing.T) { defer eh.Unsubscribe(id) ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventJobOK", ch) + err := eh.SubscribeP(id, "http.EventWorkerError", ch) require.NoError(t, err) - eh.Send(NewEvent(EventJobOK, "http", "foo")) + eh.Send(NewEvent(EventWorkerError, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventJobOK", evt.Type().String()) + require.Equal(t, "EventWorkerError", evt.Type().String()) } func TestEvenHandler2(t *testing.T) { @@ -31,18 +31,18 @@ func TestEvenHandler2(t *testing.T) { ch := make(chan Event, 100) ch2 := make(chan Event, 100) - err := eh2.SubscribeP(id2, "http.EventJobOK", ch) + err := eh2.SubscribeP(id2, "http.EventWorkerError", ch) require.NoError(t, err) - err = eh.SubscribeP(id, "http.EventJobOK", ch2) + err = eh.SubscribeP(id, "http.EventWorkerError", ch2) require.NoError(t, err) - eh.Send(NewEvent(EventJobOK, "http", "foo")) + eh.Send(NewEvent(EventWorkerError, "http", "foo")) evt := <-ch2 require.Equal(t, "foo", evt.Message()) require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventJobOK", evt.Type().String()) + require.Equal(t, "EventWorkerError", evt.Type().String()) l := eh.Len() require.Equal(t, uint(2), l) @@ -65,7 +65,7 @@ func TestEvenHandler3(t *testing.T) { defer eh.Unsubscribe(id) ch := make(chan Event, 100) - err := eh.SubscribeP(id, "EventJobOK", ch) + err := eh.SubscribeP(id, "EventWorkerError", ch) require.Error(t, err) } @@ -73,7 +73,7 @@ func TestEvenHandler4(t *testing.T) { eh, id := Bus() defer eh.Unsubscribe(id) - err := eh.SubscribeP(id, "EventJobOK", nil) + err := eh.SubscribeP(id, "EventWorkerError", nil) require.Error(t, err) } @@ -82,13 +82,45 @@ func TestEvenHandler5(t *testing.T) { defer eh.Unsubscribe(id) ch := make(chan Event, 100) - err := eh.SubscribeP(id, "http.EventJobOK", ch) + err := eh.SubscribeP(id, "http.EventWorkerError", ch) require.NoError(t, err) - eh.Send(NewEvent(EventJobOK, "http", "foo")) + eh.Send(NewEvent(EventWorkerError, "http", "foo")) evt := <-ch require.Equal(t, "foo", evt.Message()) require.Equal(t, "http", evt.Plugin()) - require.Equal(t, "EventJobOK", evt.Type().String()) + require.Equal(t, "EventWorkerError", evt.Type().String()) +} + +type MySuperEvent uint32 + +const ( + // EventHTTPError represents success unary call response + EventHTTPError MySuperEvent = iota +) + +func (mse MySuperEvent) String() string { + switch mse { + case EventHTTPError: + return "EventHTTPError" + default: + return "UnknownEventType" + } +} + +func TestEvenHandler6(t *testing.T) { + eh, id := Bus() + defer eh.Unsubscribe(id) + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventHTTPError", ch) + require.NoError(t, err) + + eh.Send(NewEvent(EventHTTPError, "http", "foo")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventHTTPError", evt.Type().String()) } diff --git a/events/types.go b/events/types.go index 65a76d15..806e81ce 100644 --- a/events/types.go +++ b/events/types.go @@ -1,5 +1,9 @@ package events +import ( + "fmt" +) + type EventBus interface { SubscribeAll(subID string, ch chan<- Event) error SubscribeP(subID string, pattern string, ch chan<- Event) error @@ -10,14 +14,14 @@ type EventBus interface { } type Event interface { + Type() fmt.Stringer Plugin() string - Type() EventType Message() string } -type RREvent struct { +type event struct { // event typ - typ EventType + typ fmt.Stringer // plugin plugin string // message @@ -25,22 +29,26 @@ type RREvent struct { } // NewEvent initializes new event -func NewEvent(t EventType, plugin string, msg string) *RREvent { - return &RREvent{ +func NewEvent(t fmt.Stringer, plugin string, message string) *event { + if t.String() == "" || plugin == "" { + return nil + } + + return &event{ typ: t, plugin: plugin, - message: msg, + message: message, } } -func (r *RREvent) Type() EventType { +func (r *event) Type() fmt.Stringer { return r.typ } -func (r *RREvent) Message() string { +func (r *event) Message() string { return r.message } -func (r *RREvent) Plugin() string { +func (r *event) Plugin() string { return r.plugin } diff --git a/events/wildcard_test.go b/events/wildcard_test.go index 230ef673..dfa65e63 100644 --- a/events/wildcard_test.go +++ b/events/wildcard_test.go @@ -45,4 +45,10 @@ func TestWildcard(t *testing.T) { assert.False(t, w.match("****")) assert.False(t, w.match("http.****")) assert.True(t, w.match("http.WorkerError")) + + w, err = newWildcard("http.Worker*") + assert.NoError(t, err) + assert.True(t, w.match("http.WorkerFoo")) + assert.False(t, w.match("h*.SuperEvent")) + assert.False(t, w.match("h*.Worker")) } diff --git a/pool/static_pool.go b/pool/static_pool.go index 11112e72..a5c98eaa 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -200,7 +200,6 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) { } // checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs -//go:inline func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) { if w.State().NumExecs() >= sp.cfg.MaxJobs { w.State().Set(worker.StateMaxJobsReached) diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index 0f527cd5..848676cf 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -122,7 +122,6 @@ func Test_Pipe_PipeError2(t *testing.T) { } func Test_Pipe_Failboot(t *testing.T) { - t.Parallel() cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() @@ -430,7 +429,6 @@ func Test_Echo_Slow(t *testing.T) { } func Test_Broken(t *testing.T) { - t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") |