summaryrefslogtreecommitdiff
path: root/plugins/broadcast/root/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/root/client.go')
-rw-r--r--plugins/broadcast/root/client.go133
1 files changed, 133 insertions, 0 deletions
diff --git a/plugins/broadcast/root/client.go b/plugins/broadcast/root/client.go
new file mode 100644
index 00000000..c5761f94
--- /dev/null
+++ b/plugins/broadcast/root/client.go
@@ -0,0 +1,133 @@
+package broadcast
+
+import "sync"
+
+// Client subscribes to a given topic and consumes or publish messages to it.
+type Client struct {
+ upstream chan *Message
+ broker Broker
+ mu sync.Mutex
+ topics []string
+ patterns []string
+}
+
+// Channel returns incoming messages channel.
+func (c *Client) Channel() chan *Message {
+ return c.upstream
+}
+
+// Publish message into associated topic or topics.
+func (c *Client) Publish(msg ...*Message) error {
+ return c.broker.Publish(msg...)
+}
+
+// Subscribe client to specific topics.
+func (c *Client) Subscribe(topics ...string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ newTopics := make([]string, 0)
+ for _, topic := range topics {
+ found := false
+ for _, e := range c.topics {
+ if e == topic {
+ found = true
+ break
+ }
+ }
+
+ if !found {
+ newTopics = append(newTopics, topic)
+ }
+ }
+
+ if len(newTopics) == 0 {
+ return nil
+ }
+
+ c.topics = append(c.topics, newTopics...)
+
+ return c.broker.Subscribe(c.upstream, newTopics...)
+}
+
+// SubscribePattern subscribe client to the specific topic pattern.
+func (c *Client) SubscribePattern(pattern string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for _, g := range c.patterns {
+ if g == pattern {
+ return nil
+ }
+ }
+
+ c.patterns = append(c.patterns, pattern)
+ return c.broker.SubscribePattern(c.upstream, pattern)
+}
+
+// Unsubscribe client from specific topics
+func (c *Client) Unsubscribe(topics ...string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ dropTopics := make([]string, 0)
+ for _, topic := range topics {
+ for i, e := range c.topics {
+ if e == topic {
+ c.topics = append(c.topics[:i], c.topics[i+1:]...)
+ dropTopics = append(dropTopics, topic)
+ }
+ }
+ }
+
+ if len(dropTopics) == 0 {
+ return nil
+ }
+
+ return c.broker.Unsubscribe(c.upstream, dropTopics...)
+}
+
+// UnsubscribePattern client from topic pattern.
+func (c *Client) UnsubscribePattern(pattern string) error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ for i := range c.patterns {
+ if c.patterns[i] == pattern {
+ c.patterns = append(c.patterns[:i], c.patterns[i+1:]...)
+
+ return c.broker.UnsubscribePattern(c.upstream, pattern)
+ }
+ }
+
+ return nil
+}
+
+// Topics return all the topics client subscribed to.
+func (c *Client) Topics() []string {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.topics
+}
+
+// Patterns return all the patterns client subscribed to.
+func (c *Client) Patterns() []string {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ return c.patterns
+}
+
+// Close the client and consumption.
+func (c *Client) Close() (err error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if len(c.topics) != 0 {
+ err = c.broker.Unsubscribe(c.upstream, c.topics...)
+ }
+
+ close(c.upstream)
+ return err
+}