diff options
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 27 |
1 files changed, 18 insertions, 9 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index b2603a40..695e7b08 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const PluginName = "redis" @@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return errors.E(err) + } - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { return f.Err() } @@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + p.log.Error("message unmarshal error") + return + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) - return + p.log.Error("redis publish", "error", f.Err()) } } }() |