summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
committerValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
commit12c031ce76c505128ebf9daafa91952855f202d4 (patch)
tree51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/redis
parent352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff)
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go7
-rw-r--r--plugins/redis/plugin.go28
2 files changed, 12 insertions, 23 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 6c9a5650..3082f24f 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -64,13 +64,6 @@ func (fi *FanIn) read() {
if !ok {
return
}
- //m := &pubsub.Message{}
- //err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- //if err != nil {
- // fi.log.Error("failed to unmarshal payload", "error", err.Error())
- // continue
- //}
-
fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 3a21204e..5b9de5fc 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,7 +6,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -110,9 +109,7 @@ func (p *Plugin) Publish(msg []byte) error {
fbsMsg := message.GetRootAsMessage(msg, 0)
for j := 0; j < fbsMsg.TopicsLength(); j++ {
- t := fbsMsg.Table()
- vec := t.ByteVector(0)
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec)
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
if f.Err() != nil {
return f.Err()
}
@@ -122,17 +119,16 @@ func (p *Plugin) Publish(msg []byte) error {
func (p *Plugin) PublishAsync(msg []byte) {
go func() {
- //p.Lock()
- //defer p.Unlock()
- //for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- // if f.Err() != nil {
- // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- // continue
- // }
- // }
- //}
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
+ return
+ }
+ }
}()
}
@@ -145,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
return <-p.fanin.Consume(), nil
}