diff options
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r-- | plugins/memory/plugin.go | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 2ad041aa..49c187bc 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -15,14 +15,14 @@ type Plugin struct { log logger.Logger // channel with the messages from the RPC - pushCh chan pubsub.Message + pushCh chan *pubsub.Message // user-subscribed topics topics sync.Map } func (p *Plugin) Init(log logger.Logger) error { p.log = log - p.pushCh = make(chan pubsub.Message, 100) + p.pushCh = make(chan *pubsub.Message, 100) return nil } @@ -34,14 +34,14 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Publish(messages []pubsub.Message) error { +func (p *Plugin) Publish(messages []*pubsub.Message) error { for i := 0; i < len(messages); i++ { p.pushCh <- messages[i] } return nil } -func (p *Plugin) PublishAsync(messages []pubsub.Message) { +func (p *Plugin) PublishAsync(messages []*pubsub.Message) { go func() { for i := 0; i < len(messages); i++ { p.pushCh <- messages[i] @@ -63,12 +63,17 @@ func (p *Plugin) Unsubscribe(topics ...string) error { return nil } -func (p *Plugin) Next() (pubsub.Message, error) { +func (p *Plugin) Next() (*pubsub.Message, error) { msg := <-p.pushCh + + if msg == nil { + return nil, nil + } + // push only messages, which are subscribed // TODO better??? - for i := 0; i < len(msg.Topics()); i++ { - if _, ok := p.topics.Load(msg.Topics()[i]); ok { + for i := 0; i < len(msg.Topics); i++ { + if _, ok := p.topics.Load(msg.Topics[i]); ok { return msg, nil } } |