summaryrefslogtreecommitdiff
path: root/plugins/memory/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory/plugin.go')
-rw-r--r--plugins/memory/plugin.go13
1 files changed, 8 insertions, 5 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index b9c5933a..eb87b39e 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -3,8 +3,9 @@ package memory
import (
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -59,18 +60,20 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
return nil
}
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
}
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < len(msg.Topics); i++ {
- if _, ok := p.topics.Load(msg.Topics[i]); ok {
- return msg, nil
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok {
+ return fbsMsg, nil
}
}
return nil, nil