summaryrefslogtreecommitdiff
path: root/plugins/memory/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r--plugins/memory/plugin.go19
1 files changed, 12 insertions, 7 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 2ad041aa..49c187bc 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -15,14 +15,14 @@ type Plugin struct {
log logger.Logger
// channel with the messages from the RPC
- pushCh chan pubsub.Message
+ pushCh chan *pubsub.Message
// user-subscribed topics
topics sync.Map
}
func (p *Plugin) Init(log logger.Logger) error {
p.log = log
- p.pushCh = make(chan pubsub.Message, 100)
+ p.pushCh = make(chan *pubsub.Message, 100)
return nil
}
@@ -34,14 +34,14 @@ func (p *Plugin) Name() string {
return PluginName
}
-func (p *Plugin) Publish(messages []pubsub.Message) error {
+func (p *Plugin) Publish(messages []*pubsub.Message) error {
for i := 0; i < len(messages); i++ {
p.pushCh <- messages[i]
}
return nil
}
-func (p *Plugin) PublishAsync(messages []pubsub.Message) {
+func (p *Plugin) PublishAsync(messages []*pubsub.Message) {
go func() {
for i := 0; i < len(messages); i++ {
p.pushCh <- messages[i]
@@ -63,12 +63,17 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
return nil
}
-func (p *Plugin) Next() (pubsub.Message, error) {
+func (p *Plugin) Next() (*pubsub.Message, error) {
msg := <-p.pushCh
+
+ if msg == nil {
+ return nil, nil
+ }
+
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < len(msg.Topics()); i++ {
- if _, ok := p.topics.Load(msg.Topics()[i]); ok {
+ for i := 0; i < len(msg.Topics); i++ {
+ if _, ok := p.topics.Load(msg.Topics[i]); ok {
return msg, nil
}
}