diff options
Diffstat (limited to 'plugins/broadcast/memory/memory.go')
-rw-r--r-- | plugins/broadcast/memory/memory.go | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/plugins/broadcast/memory/memory.go b/plugins/broadcast/memory/memory.go new file mode 100644 index 00000000..5b85d68f --- /dev/null +++ b/plugins/broadcast/memory/memory.go @@ -0,0 +1,131 @@ +package memory + +import ( + "errors" + "sync/atomic" +) + +// Memory manages broadcasting in memory. +type Memory struct { + router *Router + messages chan *Message + join, leave chan subscriber + stop chan interface{} + stopped int32 +} + +// memoryBroker creates new memory based message broker. +func memoryBroker() *Memory { + return &Memory{ + router: NewRouter(), + messages: make(chan *Message), + join: make(chan subscriber), + leave: make(chan subscriber), + stop: make(chan interface{}), + stopped: 0, + } +} + +// Serve serves broker. +func (m *Memory) Serve() error { + for { + select { + case ctx := <-m.join: + ctx.done <- m.handleJoin(ctx) + case ctx := <-m.leave: + ctx.done <- m.handleLeave(ctx) + case msg := <-m.messages: + m.router.Dispatch(msg) + case <-m.stop: + return nil + } + } +} + +func (m *Memory) handleJoin(sub subscriber) (err error) { + if sub.pattern != "" { + _, err = m.router.SubscribePattern(sub.upstream, sub.pattern) + return err + } + + m.router.Subscribe(sub.upstream, sub.topics...) + return nil +} + +func (m *Memory) handleLeave(sub subscriber) error { + if sub.pattern != "" { + m.router.UnsubscribePattern(sub.upstream, sub.pattern) + return nil + } + + m.router.Unsubscribe(sub.upstream, sub.topics...) + return nil +} + +// Stop closes the consumption and disconnects broker. +func (m *Memory) Stop() { + if atomic.CompareAndSwapInt32(&m.stopped, 0, 1) { + close(m.stop) + } +} + +// Subscribe broker to one or multiple channels. +func (m *Memory) Subscribe(upstream chan *Message, topics ...string) error { + if atomic.LoadInt32(&m.stopped) == 1 { + return errors.New("broker has been stopped") + } + + ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} + + m.join <- ctx + return <-ctx.done +} + +// SubscribePattern broker to pattern. +func (m *Memory) SubscribePattern(upstream chan *Message, pattern string) error { + if atomic.LoadInt32(&m.stopped) == 1 { + return errors.New("broker has been stopped") + } + + ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} + + m.join <- ctx + return <-ctx.done +} + +// Unsubscribe broker from one or multiple channels. +func (m *Memory) Unsubscribe(upstream chan *Message, topics ...string) error { + if atomic.LoadInt32(&m.stopped) == 1 { + return errors.New("broker has been stopped") + } + + ctx := subscriber{upstream: upstream, topics: topics, done: make(chan error)} + + m.leave <- ctx + return <-ctx.done +} + +// UnsubscribePattern broker from pattern. +func (m *Memory) UnsubscribePattern(upstream chan *Message, pattern string) error { + if atomic.LoadInt32(&m.stopped) == 1 { + return errors.New("broker has been stopped") + } + + ctx := subscriber{upstream: upstream, pattern: pattern, done: make(chan error)} + + m.leave <- ctx + return <-ctx.done +} + +// Publish one or multiple Channel. +func (m *Memory) Publish(messages ...*Message) error { + if atomic.LoadInt32(&m.stopped) == 1 { + return errors.New("broker has been stopped") + } + + for _, msg := range messages { + m.messages <- msg + } + + return nil +} |