diff options
Diffstat (limited to 'plugins/memory')
-rw-r--r-- | plugins/memory/plugin.go | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 6732ff5d..d724dff9 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -6,7 +6,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const ( @@ -86,15 +86,19 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return nil, err + } // push only messages, which are subscribed // TODO better??? - for i := 0; i < fbsMsg.TopicsLength(); i++ { + for i := 0; i < len(m.GetTopics()); i++ { // if we have active subscribers - send a message to a topic // or send nil instead - if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok { - return fbsMsg, nil + if ok := p.storage.Contains(m.GetTopics()[i]); ok { + return m, nil } } return nil, nil |