diff options
Diffstat (limited to 'plugins/broadcast/memory/memory.go')
-rw-r--r-- | plugins/broadcast/memory/memory.go | 131 |
1 files changed, 0 insertions, 131 deletions
diff --git a/plugins/broadcast/memory/memory.go b/plugins/broadcast/memory/memory.go deleted file mode 100644 index 5b85d68f..00000000 --- a/plugins/broadcast/memory/memory.go +++ /dev/null @@ -1,131 +0,0 @@ -package memory - -import ( - "errors" - "sync/atomic" -) - -// Memory manages broadcasting in memory. -type Memory struct { - router *Router - messages chan *Message - join, leave chan subscriber - stop chan interface{} - stopped int32 -} - -// memoryBroker creates new memory based message broker. -func memoryBroker() *Memory { - return &Memory{ - router: NewRouter(), - messages: make(chan *Message), - join: make(chan subscriber), - leave: make(chan subscriber), - stop: make(chan interface{}), - stopped: 0, - } -} - -// Serve serves broker. -func (m *Memory) Serve() error { - for { - select { - case ctx := <-m.join: - ctx.done <- m.handleJoin(ctx) - case ctx := <-m.leave: - ctx.done <- m.handleLeave(ctx) - case msg := <-m.messages: - m.router.Dispatch(msg) - case <-m.stop: - return nil - } - } -} - -func (m *Memory) handleJoin(sub subscriber) (err error) { - if sub.pattern != "" { - _, err = m.router.SubscribePattern(sub.upstream, sub.pattern) - return err - } - - m.router.Subscribe(sub.upstream, sub.topics...) - return nil -} - -func (m *Memory) handleLeave(sub subscriber) error { - if sub.pattern != "" { - m.router.UnsubscribePattern(sub.upstream, sub.pattern) - return nil - } - - m.router.Unsubscribe(sub.upstream, sub.topics...) - return nil -} - -// Stop closes the consumption and disconnects broker. -func (m *Memory) Stop() { - if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) { - close(m.stop) - } -} - -// Subscribe broker to one or multiple channels. -func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} - - m.join <- ctx - return <-ctx.done -} - -// SubscribePattern broker to pattern. -func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} - - m.join <- ctx - return <-ctx.done -} - -// Unsubscribe broker from one or multiple channels. -func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} - - m.leave <- ctx - return <-ctx.done -} - -// UnsubscribePattern broker from pattern. -func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} - - m.leave <- ctx - return <-ctx.done -} - -// Publish one or multiple Channel. -func (m *Memory) Publish(messages ...*Message) error { - if atomic.LoadInt32(&m.stopped) == 1 { - return errors.New("broker has been stopped") - } - - for _, msg := range messages { - m.messages <- msg - } - - return nil -} |