diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
commit | 12c031ce76c505128ebf9daafa91952855f202d4 (patch) | |
tree | 51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/redis/plugin.go | |
parent | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff) |
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 3a21204e..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error { fbsMsg := message.GetRootAsMessage(msg, 0) for j := 0; j < fbsMsg.TopicsLength(); j++ { - t := fbsMsg.Table() - vec := t.ByteVector(0) - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) if f.Err() != nil { return f.Err() } @@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error { func (p *Plugin) PublishAsync(msg []byte) { go func() { - //p.Lock() - //defer p.Unlock() - //for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - // if f.Err() != nil { - // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - // continue - // } - // } - //} + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return + } + } }() } @@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } |