summaryrefslogtreecommitdiff
path: root/plugins/redis/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 19:16:36 +0300
committerGitHub <[email protected]>2021-06-02 19:16:36 +0300
commita99c14abb333c10a9142cd2f178e001f1b1726fb (patch)
treeec46ffb3db177f9aacef75d9c7bdcd6d894bf20c /plugins/redis/plugin.go
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
parent27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff)
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r--plugins/redis/plugin.go34
1 files changed, 17 insertions, 17 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index c1480de8..5b9de5fc 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,9 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const PluginName = "redis"
@@ -101,32 +102,31 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- return f.Err()
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ return f.Err()
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- continue
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ if f.Err() != nil {
+ p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
+ return
}
}
}()
@@ -141,6 +141,6 @@ func (p *Plugin) Unsubscribe(topics ...string) error {
}
// Next return next message
-func (p *Plugin) Next() (*pubsub.Message, error) {
+func (p *Plugin) Next() (*message.Message, error) {
return <-p.fanin.Consume(), nil
}