summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-30 13:33:06 -0700
committerGitHub <[email protected]>2021-10-30 13:33:06 -0700
commitb78558d0af7f813f81de0338400f99291932347f (patch)
treee71fcd3ed2d8bbc1bb4542290a11d5cb9730a4b1
parentc8c3f9f113eae13aa37cf92043b288bb0c68a622 (diff)
parent1cac0d3efd40e8950004c8d953fb5d1f2d53197b (diff)
[#839]: refactoring(eventsbus): add docs, udpate testsv2.6.0-alpha.2
[#839]: refactoring(eventsbus): add docs, udpate tests
-rw-r--r--events/docs/events.md136
-rw-r--r--events/events.go75
-rw-r--r--events/events_test.go56
-rw-r--r--events/types.go26
-rw-r--r--events/wildcard_test.go6
-rwxr-xr-xpool/static_pool.go1
-rwxr-xr-xtransport/pipe/pipe_factory_test.go2
7 files changed, 204 insertions, 98 deletions
diff --git a/events/docs/events.md b/events/docs/events.md
index e69de29b..37059b25 100644
--- a/events/docs/events.md
+++ b/events/docs/events.md
@@ -0,0 +1,136 @@
+## RoadRunner Events bus
+
+RR events bus might be useful when one plugin raises some event on which another plugin should react. For example,
+plugins like sentry might log errors from the `http` plugin.
+
+## Simple subscription
+
+Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event.
+
+Let's have a look at the simple example:
+
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventJobOK"
+}
+```
+
+Here:
+1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`.
+2. `ch := make(chan events.Event, 100)` create an events channel.
+3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`).
+4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin.
+5. `evt := <-ch` get the event.
+
+Notes:
+1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`.
+
+## Wildcards
+
+As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`.
+Let's have a look at the next sample of code:
+
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.*", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventJobOK"
+}
+```
+
+One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`.
+
+You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type.
+
+
+## How to implement custom event
+
+Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that:
+
+```go
+package foo
+
+type MySuperEvent uint32
+
+const (
+ EventHTTPError MySuperEvent = iota
+)
+
+func (mse MySuperEvent) String() string {
+ switch mse {
+ case EventHTTPError:
+ return "EventHTTPError"
+ default:
+ return "UnknownEventType"
+ }
+}
+```
+
+Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type.
+Next you need to create an enum with the actual events and that's it.
+
+How to use that:
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.EventHTTPError", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ // first arg of the NewEvent method is fmt.Stringer
+ eh.Send(events.NewEvent(EventHTTPError, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventHTTPError"
+}
+```
+
+Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber.
+
diff --git a/events/events.go b/events/events.go
index b7396653..5a417e7f 100644
--- a/events/events.go
+++ b/events/events.go
@@ -3,72 +3,26 @@ package events
type EventType uint32
const (
- // EventUnaryCallOk represents success unary call response
- EventUnaryCallOk EventType = iota
-
- // EventUnaryCallErr raised when unary call ended with error
- EventUnaryCallErr
-
- // EventPushOK thrown when new job has been added. JobEvent is passed as context.
- EventPushOK
-
- // EventPushError caused when job can not be registered.
- EventPushError
-
- // EventJobStart thrown when new job received.
- EventJobStart
-
- // EventJobOK thrown when job execution is successfully completed. JobEvent is passed as context.
- EventJobOK
-
- // EventJobError thrown on all job related errors. See JobError as context.
- EventJobError
-
- // EventPipeActive when pipeline has started.
- EventPipeActive
-
- // EventPipeStopped when pipeline has been stopped.
- EventPipeStopped
-
- // EventPipePaused when pipeline has been paused.
- EventPipePaused
-
- // EventPipeError when pipeline specific error happen.
- EventPipeError
-
- // EventDriverReady thrown when broken is ready to accept/serve tasks.
- EventDriverReady
-
// EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct
-
+ EventWorkerConstruct EventType = iota
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
-
// EventSupervisorError triggered when supervisor can not complete work.
EventSupervisorError
-
// EventWorkerProcessExit triggered on process wait exit
EventWorkerProcessExit
-
// EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed
EventNoFreeWorkers
-
// EventMaxMemory caused when worker consumes more memory than allowed.
EventMaxMemory
-
// EventTTL thrown when worker is removed due TTL being reached. TTL defines maximum time worker is allowed to live (seconds)
EventTTL
-
// EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
-
// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
-
// EventPoolRestart triggered when pool restart is needed
EventPoolRestart
-
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
EventWorkerError
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
@@ -81,32 +35,6 @@ const (
func (et EventType) String() string {
switch et {
- case EventPushOK:
- return "EventPushOK"
- case EventPushError:
- return "EventPushError"
- case EventJobStart:
- return "EventJobStart"
- case EventJobOK:
- return "EventJobOK"
- case EventJobError:
- return "EventJobError"
- case EventPipeActive:
- return "EventPipeActive"
- case EventPipeStopped:
- return "EventPipeStopped"
- case EventPipeError:
- return "EventPipeError"
- case EventDriverReady:
- return "EventDriverReady"
- case EventPipePaused:
- return "EventPipePaused"
-
- case EventUnaryCallOk:
- return "EventUnaryCallOk"
- case EventUnaryCallErr:
- return "EventUnaryCallErr"
-
case EventWorkerProcessExit:
return "EventWorkerProcessExit"
case EventWorkerConstruct:
@@ -127,7 +55,6 @@ func (et EventType) String() string {
return "EventExecTTL"
case EventPoolRestart:
return "EventPoolRestart"
-
case EventWorkerError:
return "EventWorkerError"
case EventWorkerLog:
diff --git a/events/events_test.go b/events/events_test.go
index e15c55d6..f7cb4205 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -12,15 +12,15 @@ func TestEvenHandler(t *testing.T) {
defer eh.Unsubscribe(id)
ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ err := eh.SubscribeP(id, "http.EventWorkerError", ch)
require.NoError(t, err)
- eh.Send(NewEvent(EventJobOK, "http", "foo"))
+ eh.Send(NewEvent(EventWorkerError, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventJobOK", evt.Type().String())
+ require.Equal(t, "EventWorkerError", evt.Type().String())
}
func TestEvenHandler2(t *testing.T) {
@@ -31,18 +31,18 @@ func TestEvenHandler2(t *testing.T) {
ch := make(chan Event, 100)
ch2 := make(chan Event, 100)
- err := eh2.SubscribeP(id2, "http.EventJobOK", ch)
+ err := eh2.SubscribeP(id2, "http.EventWorkerError", ch)
require.NoError(t, err)
- err = eh.SubscribeP(id, "http.EventJobOK", ch2)
+ err = eh.SubscribeP(id, "http.EventWorkerError", ch2)
require.NoError(t, err)
- eh.Send(NewEvent(EventJobOK, "http", "foo"))
+ eh.Send(NewEvent(EventWorkerError, "http", "foo"))
evt := <-ch2
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventJobOK", evt.Type().String())
+ require.Equal(t, "EventWorkerError", evt.Type().String())
l := eh.Len()
require.Equal(t, uint(2), l)
@@ -65,7 +65,7 @@ func TestEvenHandler3(t *testing.T) {
defer eh.Unsubscribe(id)
ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "EventJobOK", ch)
+ err := eh.SubscribeP(id, "EventWorkerError", ch)
require.Error(t, err)
}
@@ -73,7 +73,7 @@ func TestEvenHandler4(t *testing.T) {
eh, id := Bus()
defer eh.Unsubscribe(id)
- err := eh.SubscribeP(id, "EventJobOK", nil)
+ err := eh.SubscribeP(id, "EventWorkerError", nil)
require.Error(t, err)
}
@@ -82,13 +82,45 @@ func TestEvenHandler5(t *testing.T) {
defer eh.Unsubscribe(id)
ch := make(chan Event, 100)
- err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ err := eh.SubscribeP(id, "http.EventWorkerError", ch)
require.NoError(t, err)
- eh.Send(NewEvent(EventJobOK, "http", "foo"))
+ eh.Send(NewEvent(EventWorkerError, "http", "foo"))
evt := <-ch
require.Equal(t, "foo", evt.Message())
require.Equal(t, "http", evt.Plugin())
- require.Equal(t, "EventJobOK", evt.Type().String())
+ require.Equal(t, "EventWorkerError", evt.Type().String())
+}
+
+type MySuperEvent uint32
+
+const (
+ // EventHTTPError represents success unary call response
+ EventHTTPError MySuperEvent = iota
+)
+
+func (mse MySuperEvent) String() string {
+ switch mse {
+ case EventHTTPError:
+ return "EventHTTPError"
+ default:
+ return "UnknownEventType"
+ }
+}
+
+func TestEvenHandler6(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventHTTPError", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventHTTPError, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventHTTPError", evt.Type().String())
}
diff --git a/events/types.go b/events/types.go
index 65a76d15..806e81ce 100644
--- a/events/types.go
+++ b/events/types.go
@@ -1,5 +1,9 @@
package events
+import (
+ "fmt"
+)
+
type EventBus interface {
SubscribeAll(subID string, ch chan<- Event) error
SubscribeP(subID string, pattern string, ch chan<- Event) error
@@ -10,14 +14,14 @@ type EventBus interface {
}
type Event interface {
+ Type() fmt.Stringer
Plugin() string
- Type() EventType
Message() string
}
-type RREvent struct {
+type event struct {
// event typ
- typ EventType
+ typ fmt.Stringer
// plugin
plugin string
// message
@@ -25,22 +29,26 @@ type RREvent struct {
}
// NewEvent initializes new event
-func NewEvent(t EventType, plugin string, msg string) *RREvent {
- return &RREvent{
+func NewEvent(t fmt.Stringer, plugin string, message string) *event {
+ if t.String() == "" || plugin == "" {
+ return nil
+ }
+
+ return &event{
typ: t,
plugin: plugin,
- message: msg,
+ message: message,
}
}
-func (r *RREvent) Type() EventType {
+func (r *event) Type() fmt.Stringer {
return r.typ
}
-func (r *RREvent) Message() string {
+func (r *event) Message() string {
return r.message
}
-func (r *RREvent) Plugin() string {
+func (r *event) Plugin() string {
return r.plugin
}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
index 230ef673..dfa65e63 100644
--- a/events/wildcard_test.go
+++ b/events/wildcard_test.go
@@ -45,4 +45,10 @@ func TestWildcard(t *testing.T) {
assert.False(t, w.match("****"))
assert.False(t, w.match("http.****"))
assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.Worker*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.WorkerFoo"))
+ assert.False(t, w.match("h*.SuperEvent"))
+ assert.False(t, w.match("h*.Worker"))
}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 11112e72..a5c98eaa 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -200,7 +200,6 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-//go:inline
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index 0f527cd5..848676cf 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -122,7 +122,6 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
func Test_Pipe_Failboot(t *testing.T) {
- t.Parallel()
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
@@ -430,7 +429,6 @@ func Test_Echo_Slow(t *testing.T) {
}
func Test_Broken(t *testing.T) {
- t.Parallel()
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")