summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--events/docs/events.md136
-rw-r--r--events/events_test.go32
-rw-r--r--events/types.go26
-rw-r--r--events/wildcard_test.go6
-rwxr-xr-xpool/static_pool.go1
-rwxr-xr-xtransport/pipe/pipe_factory_test.go2
6 files changed, 191 insertions, 12 deletions
diff --git a/events/docs/events.md b/events/docs/events.md
index e69de29b..37059b25 100644
--- a/events/docs/events.md
+++ b/events/docs/events.md
@@ -0,0 +1,136 @@
+## RoadRunner Events bus
+
+RR events bus might be useful when one plugin raises some event on which another plugin should react. For example,
+plugins like sentry might log errors from the `http` plugin.
+
+## Simple subscription
+
+Events bus supports wildcard subscriptions on the events as well as the direct subscriptions on the particular event.
+
+Let's have a look at the simple example:
+
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventJobOK"
+}
+```
+
+Here:
+1. `eh, id := events.Bus()` get the instance (it's global) of the events bus. Make sure to unsubscribe event handler when you don't need it anymore: `eh.Unsubscribe(id)`.
+2. `ch := make(chan events.Event, 100)` create an events channel.
+3. `err := eh.SubscribeP(id, "http.EventJobOK", ch)` subscribe to the events which fits your pattern (`http.EventJobOK`).
+4. `eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))` emit event from the any plugin.
+5. `evt := <-ch` get the event.
+
+Notes:
+1. If you use only `eh.Send` events bus function, you don't need to unsubscribe, so, you may simplify the declaration to the `eh, _ := events.Bus()`.
+
+## Wildcards
+
+As mentioned before, RR events bus supports wildcards subscriptions, like: `*.SomeEvent`, `http.*`, `http.Some*`, `*`.
+Let's have a look at the next sample of code:
+
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.*", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ eh.Send(events.NewEvent(events.EventJobOK, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventJobOK"
+}
+```
+
+One change between these samples is in the `SubscribeP` pattern: `err := eh.SubscribeP(id, "http.*", ch)`. Here we used `http.*` instead of `http.EventJobOK`.
+
+You also have the access to the event message, plugin and type. Message is a custom, user-defined message to log or to show to the subscriber. Plugin is a source plugin who raised this event. And the event type - is your custom or RR event type.
+
+
+## How to implement custom event
+
+Event type is a `fmt.Stringer`. That means, that your custom event type should implement this interface. Let's have a look at how to do that:
+
+```go
+package foo
+
+type MySuperEvent uint32
+
+const (
+ EventHTTPError MySuperEvent = iota
+)
+
+func (mse MySuperEvent) String() string {
+ switch mse {
+ case EventHTTPError:
+ return "EventHTTPError"
+ default:
+ return "UnknownEventType"
+ }
+}
+```
+
+Here we defined a custom type - `MySuperEvent`. For sure, it might have any name you want and represent for example some domain field like `WorkersPoolEvent` represents RR sync.pool events. Then you need to implement a `fmt.Stringer` on your custom event type.
+Next you need to create an enum with the actual events and that's it.
+
+How to use that:
+```go
+package foo
+
+import (
+ "github.com/spiral/roadrunner/v2/events"
+)
+
+func foo() {
+ eh, id := events.Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan events.Event, 100)
+ err := eh.SubscribeP(id, "http.EventHTTPError", ch)
+ if err != nil {
+ panic(err)
+ }
+
+ // first arg of the NewEvent method is fmt.Stringer
+ eh.Send(events.NewEvent(EventHTTPError, "http", "foo"))
+ evt := <-ch
+ // evt.Message() -> "foo"
+ // evt.Plugin() -> "http"
+ // evt.Type().String() -> "EventHTTPError"
+}
+```
+
+Important note: you don't need to import your custom event types into the subscriber. You only need to know the name of that event and pass a string to the subscriber.
+
diff --git a/events/events_test.go b/events/events_test.go
index e15c55d6..fd8a8a05 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -92,3 +92,35 @@ func TestEvenHandler5(t *testing.T) {
require.Equal(t, "http", evt.Plugin())
require.Equal(t, "EventJobOK", evt.Type().String())
}
+
+type MySuperEvent uint32
+
+const (
+ // EventHTTPError represents success unary call response
+ EventHTTPError MySuperEvent = iota
+)
+
+func (mse MySuperEvent) String() string {
+ switch mse {
+ case EventHTTPError:
+ return "EventHTTPError"
+ default:
+ return "UnknownEventType"
+ }
+}
+
+func TestEvenHandler6(t *testing.T) {
+ eh, id := Bus()
+ defer eh.Unsubscribe(id)
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventHTTPError", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewEvent(EventHTTPError, "http", "foo"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventHTTPError", evt.Type().String())
+}
diff --git a/events/types.go b/events/types.go
index 65a76d15..806e81ce 100644
--- a/events/types.go
+++ b/events/types.go
@@ -1,5 +1,9 @@
package events
+import (
+ "fmt"
+)
+
type EventBus interface {
SubscribeAll(subID string, ch chan<- Event) error
SubscribeP(subID string, pattern string, ch chan<- Event) error
@@ -10,14 +14,14 @@ type EventBus interface {
}
type Event interface {
+ Type() fmt.Stringer
Plugin() string
- Type() EventType
Message() string
}
-type RREvent struct {
+type event struct {
// event typ
- typ EventType
+ typ fmt.Stringer
// plugin
plugin string
// message
@@ -25,22 +29,26 @@ type RREvent struct {
}
// NewEvent initializes new event
-func NewEvent(t EventType, plugin string, msg string) *RREvent {
- return &RREvent{
+func NewEvent(t fmt.Stringer, plugin string, message string) *event {
+ if t.String() == "" || plugin == "" {
+ return nil
+ }
+
+ return &event{
typ: t,
plugin: plugin,
- message: msg,
+ message: message,
}
}
-func (r *RREvent) Type() EventType {
+func (r *event) Type() fmt.Stringer {
return r.typ
}
-func (r *RREvent) Message() string {
+func (r *event) Message() string {
return r.message
}
-func (r *RREvent) Plugin() string {
+func (r *event) Plugin() string {
return r.plugin
}
diff --git a/events/wildcard_test.go b/events/wildcard_test.go
index 230ef673..dfa65e63 100644
--- a/events/wildcard_test.go
+++ b/events/wildcard_test.go
@@ -45,4 +45,10 @@ func TestWildcard(t *testing.T) {
assert.False(t, w.match("****"))
assert.False(t, w.match("http.****"))
assert.True(t, w.match("http.WorkerError"))
+
+ w, err = newWildcard("http.Worker*")
+ assert.NoError(t, err)
+ assert.True(t, w.match("http.WorkerFoo"))
+ assert.False(t, w.match("h*.SuperEvent"))
+ assert.False(t, w.match("h*.Worker"))
}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 11112e72..a5c98eaa 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -200,7 +200,6 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-//go:inline
func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
if w.State().NumExecs() >= sp.cfg.MaxJobs {
w.State().Set(worker.StateMaxJobsReached)
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index 0f527cd5..848676cf 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -122,7 +122,6 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
func Test_Pipe_Failboot(t *testing.T) {
- t.Parallel()
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
@@ -430,7 +429,6 @@ func Test_Echo_Slow(t *testing.T) {
}
func Test_Broken(t *testing.T) {
- t.Parallel()
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")