summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root/router.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/root/router.go')
-rw-r--r--plugins/broadcast/root/router.go170
1 files changed, 170 insertions, 0 deletions
diff --git a/plugins/broadcast/root/router.go b/plugins/broadcast/root/router.go
new file mode 100644
index 00000000..91137f8b
--- /dev/null
+++ b/plugins/broadcast/root/router.go
@@ -0,0 +1,170 @@
+package broadcast
+
+//import "github.com/gobwas/glob"
+
+// Router performs internal message routing to multiple subscribers.
+type Router struct {
+ wildcard map[string]wildcard
+ routes map[string][]chan *Message
+}
+
+// wildcard handles number of topics via glob pattern.
+type wildcard struct {
+ //glob glob.Glob
+ upstream []chan *Message
+}
+
+// helper for blocking join/leave flow
+type subscriber struct {
+ upstream chan *Message
+ done chan error
+ topics []string
+ pattern string
+}
+
+// NewRouter creates new topic and pattern router.
+func NewRouter() *Router {
+ return &Router{
+ wildcard: make(map[string]wildcard),
+ routes: make(map[string][]chan *Message),
+ }
+}
+
+// Dispatch to all connected topics.
+func (r *Router) Dispatch(msg *Message) {
+ for _, w := range r.wildcard {
+ if w.glob.Match(msg.Topic) {
+ for _, upstream := range w.upstream {
+ upstream <- msg
+ }
+ }
+ }
+
+ if routes, ok := r.routes[msg.Topic]; ok {
+ for _, upstream := range routes {
+ upstream <- msg
+ }
+ }
+}
+
+// Subscribe to topic and return list of newly assigned topics.
+func (r *Router) Subscribe(upstream chan *Message, topics ...string) (newTopics []string) {
+ newTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ r.routes[topic] = []chan *Message{upstream}
+ if !r.collapsed(topic) {
+ newTopics = append(newTopics, topic)
+ }
+ continue
+ }
+
+ joined := false
+ for _, up := range r.routes[topic] {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+
+ if !joined {
+ r.routes[topic] = append(r.routes[topic], upstream)
+ }
+ }
+
+ return newTopics
+}
+
+// Unsubscribe from given list of topics and return list of topics which are no longer claimed.
+func (r *Router) Unsubscribe(upstream chan *Message, topics ...string) (dropTopics []string) {
+ dropTopics = make([]string, 0)
+ for _, topic := range topics {
+ if _, ok := r.routes[topic]; !ok {
+ // no such topic, ignore
+ continue
+ }
+
+ for i := range r.routes[topic] {
+ if r.routes[topic][i] == upstream {
+ r.routes[topic] = append(r.routes[topic][:i], r.routes[topic][i+1:]...)
+ break
+ }
+ }
+
+ if len(r.routes[topic]) == 0 {
+ delete(r.routes, topic)
+
+ // standalone empty subscription
+ if !r.collapsed(topic) {
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+
+ return dropTopics
+}
+
+// SubscribePattern subscribes to glob parent and return true and return array of newly added patterns. Error in
+// case if blob is invalid.
+func (r *Router) SubscribePattern(upstream chan *Message, pattern string) (newPatterns []string, err error) {
+ if w, ok := r.wildcard[pattern]; ok {
+ joined := false
+ for _, up := range w.upstream {
+ if up == upstream {
+ joined = true
+ break
+ }
+ }
+
+ if !joined {
+ w.upstream = append(w.upstream, upstream)
+ }
+
+ return nil, nil
+ }
+
+ g, err := glob.Compile(pattern)
+ if err != nil {
+ return nil, err
+ }
+
+ r.wildcard[pattern] = wildcard{glob: g, upstream: []chan *Message{upstream}}
+
+ return []string{pattern}, nil
+}
+
+// UnsubscribePattern unsubscribe from the pattern and returns an array of patterns which are no longer claimed.
+func (r *Router) UnsubscribePattern(upstream chan *Message, pattern string) (dropPatterns []string) {
+ // todo: store and return collapsed topics
+
+ w, ok := r.wildcard[pattern]
+ if !ok {
+ // no such pattern
+ return nil
+ }
+
+ for i, up := range w.upstream {
+ if up == upstream {
+ w.upstream[i] = w.upstream[len(w.upstream)-1]
+ w.upstream[len(w.upstream)-1] = nil
+ w.upstream = w.upstream[:len(w.upstream)-1]
+
+ if len(w.upstream) == 0 {
+ delete(r.wildcard, pattern)
+ return []string{pattern}
+ }
+ }
+ }
+
+ return nil
+}
+
+func (r *Router) collapsed(topic string) bool {
+ for _, w := range r.wildcard {
+ if w.glob.Match(topic) {
+ return true
+ }
+ }
+
+ return false
+}