summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root/router.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/root/router.go')
-rw-r--r--plugins/broadcast/root/router.go170
1 files changed, 0 insertions, 170 deletions
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
deleted file mode 100644
index 91137f8b..00000000
--- a/plugins/broadcast/root/router.go
+++ /dev/null
@@ -1,170 +0,0 @@
-package broadcast
-
-//import "github.com/gobwas/glob"
-
-// Router performs internal message routing to multiple subscribers.
-type Router struct {
- wildcard map[string]wildcard
- routes map[string][]chan *Message
-}
-
-// wildcard handles number of topics via glob pattern.
-type wildcard struct {
- //glob glob.Glob
- upstream []chan *Message
-}
-
-// helper for blocking join/leave flow
-type subscriber struct {
- upstream chan *Message
- done chan error
- topics []string
- pattern string
-}
-
-// NewRouter creates new topic and pattern router.
-func NewRouter() *Router {
- return &Router{
- wildcard: make(map[string]wildcard),
- routes: make(map[string][]chan *Message),
- }
-}
-
-// Dispatch to all connected topics.
-func (r *Router) Dispatch(msg *Message) {
- for _, w := range r.wildcard {
- if w.glob.Match(msg.Topic) {
- for _, upstream := range w.upstream {
- upstream <- msg
- }
- }
- }
-
- if routes, ok := r.routes[msg.Topic]; ok {
- for _, upstream := range routes {
- upstream <- msg
- }
- }
-}
-
-// Subscribe to topic and return list of newly assigned topics.
-func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
- newTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- r.routes[topic] = []chan *Message{upstream}
- if !r.collapsed(topic) {
- newTopics = append(newTopics, topic)
- }
- continue
- }
-
- joined := false
- for _, up := range r.routes[topic] {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- r.routes[topic] = append(r.routes[topic], upstream)
- }
- }
-
- return newTopics
-}
-
-// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
-func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
- dropTopics = make([]string, 0)
- for _, topic := range topics {
- if _, ok := r.routes[topic]; !ok {
- // no such topic, ignore
- continue
- }
-
- for i := range r.routes[topic] {
- if r.routes[topic][i] == upstream {
- r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
- break
- }
- }
-
- if len(r.routes[topic]) == 0 {
- delete(r.routes, topic)
-
- // standalone empty subscription
- if !r.collapsed(topic) {
- dropTopics = append(dropTopics, topic)
- }
- }
- }
-
- return dropTopics
-}
-
-// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
-// case if blob is invalid.
-func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
- if w, ok := r.wildcard[pattern]; ok {
- joined := false
- for _, up := range w.upstream {
- if up == upstream {
- joined = true
- break
- }
- }
-
- if !joined {
- w.upstream = append(w.upstream, upstream)
- }
-
- return nil, nil
- }
-
- g, err := glob.Compile(pattern)
- if err != nil {
- return nil, err
- }
-
- r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
-
- return []string{pattern}, nil
-}
-
-// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
-func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
- // todo: store and return collapsed topics
-
- w, ok := r.wildcard[pattern]
- if !ok {
- // no such pattern
- return nil
- }
-
- for i, up := range w.upstream {
- if up == upstream {
- w.upstream[i] = w.upstream[len(w.upstream)-1]
- w.upstream[len(w.upstream)-1] = nil
- w.upstream = w.upstream[:len(w.upstream)-1]
-
- if len(w.upstream) == 0 {
- delete(r.wildcard, pattern)
- return []string{pattern}
- }
- }
- }
-
- return nil
-}
-
-func (r *Router) collapsed(topic string) bool {
- for _, w := range r.wildcard {
- if w.glob.Match(topic) {
- return true
- }
- }
-
- return false
-}