summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 13:10:49 +0300
committerValery Piashchynski <[email protected]>2021-10-27 13:10:49 +0300
commit82ee0c5a87141f31e4e28089c1a269f70066284b (patch)
tree0c5a2c02172b64b1ecb53c85c2c3640a3fdb32e4
parent9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (diff)
Add more tests, add Len() method
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--events/events_test.go63
-rw-r--r--events/eventsbus.go43
-rw-r--r--events/types.go11
-rw-r--r--events/wildcard.go2
4 files changed, 108 insertions, 11 deletions
diff --git a/events/events_test.go b/events/events_test.go
index 6c501392..41944b48 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -20,3 +20,66 @@ func TestEvenHandler(t *testing.T) {
require.Equal(t, "http", evt.Plugin())
require.Equal(t, "EventJobOK", evt.Type().String())
}
+
+func TestEvenHandler2(t *testing.T) {
+ eh, id := Bus()
+ eh2, id2 := Bus()
+
+ ch := make(chan Event, 100)
+ ch2 := make(chan Event, 100)
+ err := eh2.SubscribeP(id2, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ err = eh.SubscribeP(id, "http.EventJobOK", ch2)
+ require.NoError(t, err)
+
+ eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+
+ evt := <-ch2
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+
+ l := eh.Len()
+ require.Equal(t, uint(2), l)
+
+ eh.Unsubscribe(id)
+
+ l = eh.Len()
+ require.Equal(t, uint(1), l)
+
+ eh2.Unsubscribe(id2)
+
+ l = eh.Len()
+ require.Equal(t, uint(0), l)
+}
+
+func TestEvenHandler3(t *testing.T) {
+ eh, id := Bus()
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "EventJobOK", ch)
+ require.Error(t, err)
+}
+
+func TestEvenHandler4(t *testing.T) {
+ eh, id := Bus()
+
+ err := eh.SubscribeP(id, "EventJobOK", nil)
+ require.Error(t, err)
+}
+
+func TestEvenHandler5(t *testing.T) {
+ eh, id := Bus()
+
+ ch := make(chan Event, 100)
+ err := eh.SubscribeP(id, "http.EventJobOK", ch)
+ require.NoError(t, err)
+
+ eh.Send(NewRREvent(EventJobOK, "foo", "http"))
+
+ evt := <-ch
+ require.Equal(t, "foo", evt.Message())
+ require.Equal(t, "http", evt.Plugin())
+ require.Equal(t, "EventJobOK", evt.Type().String())
+}
diff --git a/events/eventsbus.go b/events/eventsbus.go
index 79f5babd..cd0dca71 100644
--- a/events/eventsbus.go
+++ b/events/eventsbus.go
@@ -2,7 +2,10 @@ package events
import (
"fmt"
+ "strings"
"sync"
+
+ "github.com/spiral/errors"
)
type sub struct {
@@ -32,17 +35,39 @@ http.* <-
// SubscribeAll for all RR events
// returns subscriptionID
func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+
+ if subIDTr == "" {
+ return errors.Str("subscriberID can't be empty")
+ }
+
return eb.subscribe(subID, "*", ch)
}
// SubscribeP pattern like "pluginName.EventType"
func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+ patternTr := strings.Trim(pattern, " ")
+
+ if subIDTr == "" || patternTr == "" {
+ return errors.Str("subscriberID or pattern can't be empty")
+ }
+
return eb.subscribe(subID, pattern, ch)
}
func (eb *eventsBus) Unsubscribe(subID string) {
eb.subscribers.Delete(subID)
}
+
func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
if sb, ok := eb.subscribers.Load(subID); ok {
eb.Lock()
@@ -64,9 +89,25 @@ func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
// Send sends event to the events bus
func (eb *eventsBus) Send(ev Event) {
+ // do not accept nil events
+ if ev == nil {
+ return
+ }
+
eb.internalEvCh <- ev
}
+func (eb *eventsBus) Len() uint {
+ var ln uint
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ ln++
+ return true
+ })
+
+ return ln
+}
+
func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
eb.Lock()
defer eb.Unlock()
@@ -105,7 +146,7 @@ func (eb *eventsBus) handleEvents() {
for { //nolint:gosimple
select {
case ev := <-eb.internalEvCh:
- //
+ // http.WorkerError for example
wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
eb.subscribers.Range(func(key, value interface{}) bool {
diff --git a/events/types.go b/events/types.go
index 09bbf7a7..d8e40084 100644
--- a/events/types.go
+++ b/events/types.go
@@ -1,19 +1,15 @@
package events
-import (
- "fmt"
-)
-
type EventBus interface {
SubscribeAll(subID string, ch chan<- Event) error
SubscribeP(subID string, pattern string, ch chan<- Event) error
Unsubscribe(subID string)
UnsubscribeP(subID, pattern string)
+ Len() uint
Send(ev Event)
}
type Event interface {
- fmt.Stringer
Plugin() string
Type() EventType
Message() string
@@ -30,6 +26,7 @@ type RREvent struct {
// NewRREvent initializes new event
func NewRREvent(t EventType, msg string, plugin string) *RREvent {
+ // get
return &RREvent{
T: t,
P: plugin,
@@ -37,10 +34,6 @@ func NewRREvent(t EventType, msg string, plugin string) *RREvent {
}
}
-func (r *RREvent) String() string {
- return "RoadRunner event"
-}
-
func (r *RREvent) Type() EventType {
return r.T
}
diff --git a/events/wildcard.go b/events/wildcard.go
index 171cbf9d..b4c28ae1 100644
--- a/events/wildcard.go
+++ b/events/wildcard.go
@@ -26,7 +26,7 @@ func newWildcard(pattern string) (*wildcard, error) {
if dotI == -1 {
// http.SuperEvent
- return nil, errors.Str("wrong wildcard, no * or .")
+ return nil, errors.Str("wrong wildcard, no * or . Usage: http.Event or *.Event or http.*")
}
return &wildcard{origin[0:dotI], origin[dotI+1:]}, nil