diff options
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 3a21204e..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error { fbsMsg := message.GetRootAsMessage(msg, 0) for j := 0; j < fbsMsg.TopicsLength(); j++ { - t := fbsMsg.Table() - vec := t.ByteVector(0) - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) if f.Err() != nil { return f.Err() } @@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error { func (p *Plugin) PublishAsync(msg []byte) { go func() { - //p.Lock() - //defer p.Unlock() - //for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - // if f.Err() != nil { - // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - // continue - // } - // } - //} + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return + } + } }() } @@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } |