diff options
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 19 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 31 |
2 files changed, 34 insertions, 16 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 321bfaaa..ac9ebcc2 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,9 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" "github.com/spiral/errors" @@ -22,13 +23,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *message.Message + out chan *websocketsv1.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *message.Message, 100) + out := make(chan *websocketsv1.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -65,7 +66,15 @@ func (fi *FanIn) read() { if !ok { return } - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) + + m := &websocketsv1.Message{} + err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("message unmarshal") + continue + } + + fi.out <- m case <-fi.exit: return } @@ -88,6 +97,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *websocketsv1.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index b2603a40..47ffeb39 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,10 +6,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const PluginName = "redis" @@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return errors.E(err) + } - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { return f.Err() } @@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + m := &websocketsv1.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + p.log.Error("message unmarshal error") + return + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) - return + p.log.Error("redis publish", "error", f.Err()) } } }() @@ -200,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { return <-p.fanin.consume(), nil } |