summaryrefslogtreecommitdiff
path: root/events/eventsbus.go
diff options
context:
space:
mode:
Diffstat (limited to 'events/eventsbus.go')
-rw-r--r--events/eventsbus.go43
1 files changed, 42 insertions, 1 deletions
diff --git a/events/eventsbus.go b/events/eventsbus.go
index 79f5babd..cd0dca71 100644
--- a/events/eventsbus.go
+++ b/events/eventsbus.go
@@ -2,7 +2,10 @@ package events
import (
"fmt"
+ "strings"
"sync"
+
+ "github.com/spiral/errors"
)
type sub struct {
@@ -32,17 +35,39 @@ http.* <-
// SubscribeAll for all RR events
// returns subscriptionID
func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+
+ if subIDTr == "" {
+ return errors.Str("subscriberID can't be empty")
+ }
+
return eb.subscribe(subID, "*", ch)
}
// SubscribeP pattern like "pluginName.EventType"
func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error {
+ if ch == nil {
+ return errors.Str("nil channel provided")
+ }
+
+ subIDTr := strings.Trim(subID, " ")
+ patternTr := strings.Trim(pattern, " ")
+
+ if subIDTr == "" || patternTr == "" {
+ return errors.Str("subscriberID or pattern can't be empty")
+ }
+
return eb.subscribe(subID, pattern, ch)
}
func (eb *eventsBus) Unsubscribe(subID string) {
eb.subscribers.Delete(subID)
}
+
func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
if sb, ok := eb.subscribers.Load(subID); ok {
eb.Lock()
@@ -64,9 +89,25 @@ func (eb *eventsBus) UnsubscribeP(subID, pattern string) {
// Send sends event to the events bus
func (eb *eventsBus) Send(ev Event) {
+ // do not accept nil events
+ if ev == nil {
+ return
+ }
+
eb.internalEvCh <- ev
}
+func (eb *eventsBus) Len() uint {
+ var ln uint
+
+ eb.subscribers.Range(func(key, value interface{}) bool {
+ ln++
+ return true
+ })
+
+ return ln
+}
+
func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error {
eb.Lock()
defer eb.Unlock()
@@ -105,7 +146,7 @@ func (eb *eventsBus) handleEvents() {
for { //nolint:gosimple
select {
case ev := <-eb.internalEvCh:
- //
+ // http.WorkerError for example
wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String())
eb.subscribers.Range(func(key, value interface{}) bool {