diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
commit | 12c031ce76c505128ebf9daafa91952855f202d4 (patch) | |
tree | 51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/redis | |
parent | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff) |
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 7 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 28 |
2 files changed, 12 insertions, 23 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 6c9a5650..3082f24f 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -64,13 +64,6 @@ func (fi *FanIn) read() { if !ok { return } - //m := &pubsub.Message{} - //err := json.Unmarshal(utils.AsBytes(msg.Payload), m) - //if err != nil { - // fi.log.Error("failed to unmarshal payload", "error", err.Error()) - // continue - //} - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) case <-fi.exit: return diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 3a21204e..5b9de5fc 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error { fbsMsg := message.GetRootAsMessage(msg, 0) for j := 0; j < fbsMsg.TopicsLength(); j++ { - t := fbsMsg.Table() - vec := t.ByteVector(0) - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec) + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) if f.Err() != nil { return f.Err() } @@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error { func (p *Plugin) PublishAsync(msg []byte) { go func() { - //p.Lock() - //defer p.Unlock() - //for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i]) - // if f.Err() != nil { - // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error()) - // continue - // } - // } - //} + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(msg, 0) + for j := 0; j < fbsMsg.TopicsLength(); j++ { + f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + if f.Err() != nil { + p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) + return + } + } }() } @@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error { } // Next return next message -func (p *Plugin) Next() (*pubsub.Message, error) { +func (p *Plugin) Next() (*message.Message, error) { return <-p.fanin.Consume(), nil } |