diff options
Diffstat (limited to 'plugins/broadcast/root/router.go')
-rw-r--r-- | plugins/broadcast/root/router.go | 170 |
1 files changed, 0 insertions, 170 deletions
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go deleted file mode 100644 index 91137f8b..00000000 --- a/plugins/broadcast/root/router.go +++ /dev/null @@ -1,170 +0,0 @@ -package broadcast - -//import "github.com/gobwas/glob" - -// Router performs internal message routing to multiple subscribers. -type Router struct { - wildcard map[string]wildcard - routes map[string][]chan *Message -} - -// wildcard handles number of topics via glob pattern. -type wildcard struct { - //glob glob.Glob - upstream []chan *Message -} - -// helper for blocking join/leave flow -type subscriber struct { - upstream chan *Message - done chan error - topics []string - pattern string -} - -// NewRouter creates new topic and pattern router. -func NewRouter() *Router { - return &Router{ - wildcard: make(map[string]wildcard), - routes: make(map[string][]chan *Message), - } -} - -// Dispatch to all connected topics. -func (r *Router) Dispatch(msg *Message) { - for _, w := range r.wildcard { - if w.glob.Match(msg.Topic) { - for _, upstream := range w.upstream { - upstream <- msg - } - } - } - - if routes, ok := r.routes[msg.Topic]; ok { - for _, upstream := range routes { - upstream <- msg - } - } -} - -// Subscribe to topic and return list of newly assigned topics. -func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) { - newTopics = make([]string, 0) - for _, topic := range topics { - if _, ok := r.routes[topic]; !ok { - r.routes[topic] = []chan *Message{upstream} - if !r.collapsed(topic) { - newTopics = append(newTopics, topic) - } - continue - } - - joined := false - for _, up := range r.routes[topic] { - if up == upstream { - joined = true - break - } - } - - if !joined { - r.routes[topic] = append(r.routes[topic], upstream) - } - } - - return newTopics -} - -// Unsubscribe from given list of topics and return list of topics which are no longer claimed. -func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) { - dropTopics = make([]string, 0) - for _, topic := range topics { - if _, ok := r.routes[topic]; !ok { - // no such topic, ignore - continue - } - - for i := range r.routes[topic] { - if r.routes[topic][i] == upstream { - r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...) - break - } - } - - if len(r.routes[topic]) == 0 { - delete(r.routes, topic) - - // standalone empty subscription - if !r.collapsed(topic) { - dropTopics = append(dropTopics, topic) - } - } - } - - return dropTopics -} - -// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in -// case if blob is invalid. -func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) { - if w, ok := r.wildcard[pattern]; ok { - joined := false - for _, up := range w.upstream { - if up == upstream { - joined = true - break - } - } - - if !joined { - w.upstream = append(w.upstream, upstream) - } - - return nil, nil - } - - g, err := glob.Compile(pattern) - if err != nil { - return nil, err - } - - r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}} - - return []string{pattern}, nil -} - -// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed. -func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) { - // todo: store and return collapsed topics - - w, ok := r.wildcard[pattern] - if !ok { - // no such pattern - return nil - } - - for i, up := range w.upstream { - if up == upstream { - w.upstream[i] = w.upstream[len(w.upstream)-1] - w.upstream[len(w.upstream)-1] = nil - w.upstream = w.upstream[:len(w.upstream)-1] - - if len(w.upstream) == 0 { - delete(r.wildcard, pattern) - return []string{pattern} - } - } - } - - return nil -} - -func (r *Router) collapsed(topic string) bool { - for _, w := range r.wildcard { - if w.glob.Match(topic) { - return true - } - } - - return false -} |