diff options
Diffstat (limited to 'events/eventsbus.go')
-rw-r--r-- | events/eventsbus.go | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/events/eventsbus.go b/events/eventsbus.go new file mode 100644 index 00000000..cd0dca71 --- /dev/null +++ b/events/eventsbus.go @@ -0,0 +1,170 @@ +package events + +import ( + "fmt" + "strings" + "sync" + + "github.com/spiral/errors" +) + +type sub struct { + pattern string + w *wildcard + events chan<- Event +} + +type eventsBus struct { + sync.RWMutex + subscribers sync.Map + internalEvCh chan Event + stop chan struct{} +} + +func newEventsBus() *eventsBus { + return &eventsBus{ + internalEvCh: make(chan Event, 100), + stop: make(chan struct{}), + } +} + +/* +http.* <- +*/ + +// SubscribeAll for all RR events +// returns subscriptionID +func (eb *eventsBus) SubscribeAll(subID string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + + if subIDTr == "" { + return errors.Str("subscriberID can't be empty") + } + + return eb.subscribe(subID, "*", ch) +} + +// SubscribeP pattern like "pluginName.EventType" +func (eb *eventsBus) SubscribeP(subID string, pattern string, ch chan<- Event) error { + if ch == nil { + return errors.Str("nil channel provided") + } + + subIDTr := strings.Trim(subID, " ") + patternTr := strings.Trim(pattern, " ") + + if subIDTr == "" || patternTr == "" { + return errors.Str("subscriberID or pattern can't be empty") + } + + return eb.subscribe(subID, pattern, ch) +} + +func (eb *eventsBus) Unsubscribe(subID string) { + eb.subscribers.Delete(subID) +} + +func (eb *eventsBus) UnsubscribeP(subID, pattern string) { + if sb, ok := eb.subscribers.Load(subID); ok { + eb.Lock() + defer eb.Unlock() + + sbArr := sb.([]*sub) + + for i := 0; i < len(sbArr); i++ { + if sbArr[i].pattern == pattern { + sbArr[i] = sbArr[len(sbArr)-1] + sbArr = sbArr[:len(sbArr)-1] + // replace with new array + eb.subscribers.Store(subID, sbArr) + return + } + } + } +} + +// Send sends event to the events bus +func (eb *eventsBus) Send(ev Event) { + // do not accept nil events + if ev == nil { + return + } + + eb.internalEvCh <- ev +} + +func (eb *eventsBus) Len() uint { + var ln uint + + eb.subscribers.Range(func(key, value interface{}) bool { + ln++ + return true + }) + + return ln +} + +func (eb *eventsBus) subscribe(subID string, pattern string, ch chan<- Event) error { + eb.Lock() + defer eb.Unlock() + w, err := newWildcard(pattern) + if err != nil { + return err + } + + if sb, ok := eb.subscribers.Load(subID); ok { + // at this point we are confident that sb is a []*sub type + subArr := sb.([]*sub) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil + } + + subArr := make([]*sub, 0, 5) + subArr = append(subArr, &sub{ + pattern: pattern, + w: w, + events: ch, + }) + + eb.subscribers.Store(subID, subArr) + + return nil +} + +func (eb *eventsBus) handleEvents() { + for { //nolint:gosimple + select { + case ev := <-eb.internalEvCh: + // http.WorkerError for example + wc := fmt.Sprintf("%s.%s", ev.Plugin(), ev.Type().String()) + + eb.subscribers.Range(func(key, value interface{}) bool { + vsub := value.([]*sub) + + for i := 0; i < len(vsub); i++ { + if vsub[i].w.match(wc) { + select { + case vsub[i].events <- ev: + return true + default: + return true + } + } + } + + return true + }) + } + } +} |