summaryrefslogtreecommitdiff
path: root/plugins/redis
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis')
-rw-r--r--plugins/redis/fanin.go19
-rw-r--r--plugins/redis/plugin.go31
2 files changed, 34 insertions, 16 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 321bfaaa..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,9 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
@@ -22,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,7 +66,15 @@ func (fi *FanIn) read() {
if !ok {
return
}
- fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
+
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("message unmarshal")
+ continue
+ }
+
+ fi.out <- m
case <-fi.exit:
return
}
@@ -88,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index b2603a40..47ffeb39 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,10 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const PluginName = "redis"
@@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return errors.E(err)
+ }
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
return f.Err()
}
@@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ p.log.Error("message unmarshal error")
+ return
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
- return
+ p.log.Error("redis publish", "error", f.Err())
}
}
}()
@@ -200,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
return <-p.fanin.consume(), nil
}