diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 19:16:36 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-02 19:16:36 +0300 |
commit | a99c14abb333c10a9142cd2f178e001f1b1726fb (patch) | |
tree | ec46ffb3db177f9aacef75d9c7bdcd6d894bf20c /plugins/redis/plugin.go | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) | |
parent | 27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff) |
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 34 |
1 files changed, 17 insertions, 17 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index c1480de8..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,9 +6,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) const PluginName = "redis" @@ -101,32 +102,31 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - return f.Err() - } + fbsMsg := message.GetRootAsMessage(msg, 0) + + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + return f.Err() } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - if f.Err() != nil { - p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - continue - } + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return } } }() @@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } |