diff options
Diffstat (limited to 'events/eventsbus.go')
-rw-r--r-- | events/eventsbus.go | 43 |
1 files changed, 42 insertions, 1 deletions
diff --git a/events/eventsbus.go b/events/eventsbus.go index 79f5babd..cd0dca71 100644 --- a/events/eventsbus.go +++ b/events/eventsbus.go @@ -2,7 +2,10 @@ package events import ( "fmt" + "strings" "sync" + + "github.com/spiral/errors" ) type sub struct { @@ -32,17 +35,39 @@ http.* <- // SubscribeAll for all RR events // returns subscriptionID func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + return eb.subscribe(subID, "*", ch) } // SubscribeP pattern like "pluginName.EventType" func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + return eb.subscribe(subID, pattern, ch) } func (eb *eventsBus) Unsubscribe(subID string) { eb.subscribers.Delete(subID) } + func (eb *eventsBus) UnsubscribeP(subID, pattern string) { if sb, ok := eb.subscribers.Load(subID); ok { eb.Lock() @@ -64,9 +89,25 @@ func (eb *eventsBus) UnsubscribeP(subID, pattern string) { // Send sends event to the events bus func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + eb.internalEvCh <- ev } +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { eb.Lock() defer eb.Unlock() @@ -105,7 +146,7 @@ func (eb *eventsBus) handleEvents() { for { //nolint:gosimple select { case ev := <-eb.internalEvCh: - // + // http.WorkerError for example wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) eb.subscribers.Range(func(key, value interface{}) bool { |