diff options
author | Valery Piashchynski <[email protected]> | 2021-10-27 13:10:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-27 13:10:49 +0300 |
commit | 82ee0c5a87141f31e4e28089c1a269f70066284b (patch) | |
tree | 0c5a2c02172b64b1ecb53c85c2c3640a3fdb32e4 | |
parent | 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (diff) |
Add more tests, add Len() method
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | events/events_test.go | 63 | ||||
-rw-r--r-- | events/eventsbus.go | 43 | ||||
-rw-r--r-- | events/types.go | 11 | ||||
-rw-r--r-- | events/wildcard.go | 2 |
4 files changed, 108 insertions, 11 deletions
diff --git a/events/events_test.go b/events/events_test.go index 6c501392..41944b48 100644 --- a/events/events_test.go +++ b/events/events_test.go @@ -20,3 +20,66 @@ func TestEvenHandler(t *testing.T) { require.Equal(t, "http", evt.Plugin()) require.Equal(t, "EventJobOK", evt.Type().String()) } + +func TestEvenHandler2(t *testing.T) { + eh, id := Bus() + eh2, id2 := Bus() + + ch := make(chan Event, 100) + ch2 := make(chan Event, 100) + err := eh2.SubscribeP(id2, "http.EventJobOK", ch) + require.NoError(t, err) + + err = eh.SubscribeP(id, "http.EventJobOK", ch2) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch2 + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) + + l := eh.Len() + require.Equal(t, uint(2), l) + + eh.Unsubscribe(id) + + l = eh.Len() + require.Equal(t, uint(1), l) + + eh2.Unsubscribe(id2) + + l = eh.Len() + require.Equal(t, uint(0), l) +} + +func TestEvenHandler3(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "EventJobOK", ch) + require.Error(t, err) +} + +func TestEvenHandler4(t *testing.T) { + eh, id := Bus() + + err := eh.SubscribeP(id, "EventJobOK", nil) + require.Error(t, err) +} + +func TestEvenHandler5(t *testing.T) { + eh, id := Bus() + + ch := make(chan Event, 100) + err := eh.SubscribeP(id, "http.EventJobOK", ch) + require.NoError(t, err) + + eh.Send(NewRREvent(EventJobOK, "foo", "http")) + + evt := <-ch + require.Equal(t, "foo", evt.Message()) + require.Equal(t, "http", evt.Plugin()) + require.Equal(t, "EventJobOK", evt.Type().String()) +} diff --git a/events/eventsbus.go b/events/eventsbus.go index 79f5babd..cd0dca71 100644 --- a/events/eventsbus.go +++ b/events/eventsbus.go @@ -2,7 +2,10 @@ package events import ( "fmt" + "strings" "sync" + + "github.com/spiral/errors" ) type sub struct { @@ -32,17 +35,39 @@ http.* <- // SubscribeAll for all RR events // returns subscriptionID func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + return eb.subscribe(subID, "*", ch) } // SubscribeP pattern like "pluginName.EventType" func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + return eb.subscribe(subID, pattern, ch) } func (eb *eventsBus) Unsubscribe(subID string) { eb.subscribers.Delete(subID) } + func (eb *eventsBus) UnsubscribeP(subID, pattern string) { if sb, ok := eb.subscribers.Load(subID); ok { eb.Lock() @@ -64,9 +89,25 @@ func (eb *eventsBus) UnsubscribeP(subID, pattern string) { // Send sends event to the events bus func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + eb.internalEvCh <- ev } +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { eb.Lock() defer eb.Unlock() @@ -105,7 +146,7 @@ func (eb *eventsBus) handleEvents() { for { //nolint:gosimple select { case ev := <-eb.internalEvCh: - // + // http.WorkerError for example wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) eb.subscribers.Range(func(key, value interface{}) bool { diff --git a/events/types.go b/events/types.go index 09bbf7a7..d8e40084 100644 --- a/events/types.go +++ b/events/types.go @@ -1,19 +1,15 @@ package events -import ( - "fmt" -) - type EventBus interface { SubscribeAll(subID string, ch chan<- Event) error SubscribeP(subID string, pattern string, ch chan<- Event) error Unsubscribe(subID string) UnsubscribeP(subID, pattern string) + Len() uint Send(ev Event) } type Event interface { - fmt.Stringer Plugin() string Type() EventType Message() string @@ -30,6 +26,7 @@ type RREvent struct { // NewRREvent initializes new event func NewRREvent(t EventType, msg string, plugin string) *RREvent { + // get return &RREvent{ T: t, P: plugin, @@ -37,10 +34,6 @@ func NewRREvent(t EventType, msg string, plugin string) *RREvent { } } -func (r *RREvent) String() string { - return "RoadRunner event" -} - func (r *RREvent) Type() EventType { return r.T } diff --git a/events/wildcard.go b/events/wildcard.go index 171cbf9d..b4c28ae1 100644 --- a/events/wildcard.go +++ b/events/wildcard.go @@ -26,7 +26,7 @@ func newWildcard(pattern string) (*wildcard, error) { if dotI == -1 { // http.SuperEvent - return nil, errors.Str("wrong wildcard, no * or .") + return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*") } return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil |