diff options
Diffstat (limited to 'plugins/broadcast/root/router.go')
-rw-r--r-- | plugins/broadcast/root/router.go | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go new file mode 100644 index 00000000..91137f8b --- /dev/null +++ b/plugins/broadcast/root/router.go @@ -0,0 +1,170 @@ +package broadcast + +//import "github.com/gobwas/glob" + +// Router performs internal message routing to multiple subscribers. +type Router struct { + wildcard map[string]wildcard + routes map[string][]chan *Message +} + +// wildcard handles number of topics via glob pattern. +type wildcard struct { + //glob glob.Glob + upstream []chan *Message +} + +// helper for blocking join/leave flow +type subscriber struct { + upstream chan *Message + done chan error + topics []string + pattern string +} + +// NewRouter creates new topic and pattern router. +func NewRouter() *Router { + return &Router{ + wildcard: make(map[string]wildcard), + routes: make(map[string][]chan *Message), + } +} + +// Dispatch to all connected topics. +func (r *Router) Dispatch(msg *Message) { + for _, w := range r.wildcard { + if w.glob.Match(msg.Topic) { + for _, upstream := range w.upstream { + upstream <- msg + } + } + } + + if routes, ok := r.routes[msg.Topic]; ok { + for _, upstream := range routes { + upstream <- msg + } + } +} + +// Subscribe to topic and return list of newly assigned topics. +func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) { + newTopics = make([]string, 0) + for _, topic := range topics { + if _, ok := r.routes[topic]; !ok { + r.routes[topic] = []chan *Message{upstream} + if !r.collapsed(topic) { + newTopics = append(newTopics, topic) + } + continue + } + + joined := false + for _, up := range r.routes[topic] { + if up == upstream { + joined = true + break + } + } + + if !joined { + r.routes[topic] = append(r.routes[topic], upstream) + } + } + + return newTopics +} + +// Unsubscribe from given list of topics and return list of topics which are no longer claimed. +func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) { + dropTopics = make([]string, 0) + for _, topic := range topics { + if _, ok := r.routes[topic]; !ok { + // no such topic, ignore + continue + } + + for i := range r.routes[topic] { + if r.routes[topic][i] == upstream { + r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...) + break + } + } + + if len(r.routes[topic]) == 0 { + delete(r.routes, topic) + + // standalone empty subscription + if !r.collapsed(topic) { + dropTopics = append(dropTopics, topic) + } + } + } + + return dropTopics +} + +// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in +// case if blob is invalid. +func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) { + if w, ok := r.wildcard[pattern]; ok { + joined := false + for _, up := range w.upstream { + if up == upstream { + joined = true + break + } + } + + if !joined { + w.upstream = append(w.upstream, upstream) + } + + return nil, nil + } + + g, err := glob.Compile(pattern) + if err != nil { + return nil, err + } + + r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}} + + return []string{pattern}, nil +} + +// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed. +func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) { + // todo: store and return collapsed topics + + w, ok := r.wildcard[pattern] + if !ok { + // no such pattern + return nil + } + + for i, up := range w.upstream { + if up == upstream { + w.upstream[i] = w.upstream[len(w.upstream)-1] + w.upstream[len(w.upstream)-1] = nil + w.upstream = w.upstream[:len(w.upstream)-1] + + if len(w.upstream) == 0 { + delete(r.wildcard, pattern) + return []string{pattern} + } + } + } + + return nil +} + +func (r *Router) collapsed(topic string) bool { + for _, w := range r.wildcard { + if w.glob.Match(topic) { + return true + } + } + + return false +} |