summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
committerValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
commit352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch)
treed940de0ee304d3edb60daa35568c3f186dc6a8b5 /plugins/redis
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go25
-rw-r--r--plugins/redis/plugin.go42
2 files changed, 35 insertions, 32 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 93b13124..6c9a5650 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,7 @@ import (
"context"
"sync"
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/go-redis/redis/v8"
@@ -22,13 +21,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *pubsub.Message
+ out chan *message.Message
exit chan struct{}
}
func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *pubsub.Message, 100)
+ out := make(chan *message.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,14 +64,14 @@ func (fi *FanIn) read() {
if !ok {
return
}
- m := &pubsub.Message{}
- err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("failed to unmarshal payload", "error", err.Error())
- continue
- }
-
- fi.out <- m
+ //m := &pubsub.Message{}
+ //err := json.Unmarshal(utils.AsBytes(msg.Payload), m)
+ //if err != nil {
+ // fi.log.Error("failed to unmarshal payload", "error", err.Error())
+ // continue
+ //}
+
+ fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
case <-fi.exit:
return
}
@@ -95,6 +94,6 @@ func (fi *FanIn) Stop() error {
return nil
}
-func (fi *FanIn) Consume() <-chan *pubsub.Message {
+func (fi *FanIn) Consume() <-chan *message.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index c1480de8..3a21204e 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -7,8 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
const PluginName = "redis"
@@ -101,34 +103,36 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- return f.Err()
- }
+ fbsMsg := message.GetRootAsMessage(msg, 0)
+
+ for j := 0; j < fbsMsg.TopicsLength(); j++ {
+ t := fbsMsg.Table()
+ vec := t.ByteVector(0)
+ f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), vec)
+ if f.Err() != nil {
+ return f.Err()
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []byte) {
go func() {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
- if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
- continue
- }
- }
- }
+ //p.Lock()
+ //defer p.Unlock()
+ //for i := 0; i < len(msg); i++ {
+ // for j := 0; j < len(msg[i].Topics); j++ {
+ // f := p.universalClient.Publish(context.Background(), msg[i].Topics[j], msg[i])
+ // if f.Err() != nil {
+ // p.log.Error("errors publishing message", "topic", msg[i].Topics[j], "error", f.Err().Error())
+ // continue
+ // }
+ // }
+ //}
}()
}