summaryrefslogtreecommitdiff
path: root/plugins/memory
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/memory')
-rw-r--r--plugins/memory/plugin.go14
1 files changed, 9 insertions, 5 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6732ff5d..d724dff9 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -6,7 +6,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/bst"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -86,15 +86,19 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &message.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ for i := 0; i < len(m.GetTopics()); i++ {
// if we have active subscribers - send a message to a topic
// or send nil instead
- if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
- return fbsMsg, nil
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
}
}
return nil, nil