diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
commit | cc271dceb13d3929f0382311dfce3dfed2ce04ce (patch) | |
tree | 13c4c3f380d8309b95c9600cc2000d1d5ab87cda /plugins/redis | |
parent | a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff) |
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 10 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 8 |
2 files changed, 9 insertions, 9 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 76bef400..ac9ebcc2 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,7 +4,7 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -23,13 +23,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *message.Message + out chan *websocketsv1.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *message.Message, 100) + out := make(chan *websocketsv1.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -67,7 +67,7 @@ func (fi *FanIn) read() { return } - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) if err != nil { fi.log.Error("message unmarshal") @@ -97,6 +97,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *websocketsv1.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 695e7b08..47ffeb39 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -107,7 +107,7 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return errors.E(err) @@ -126,7 +126,7 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { p.log.Error("message unmarshal error") @@ -209,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { return <-p.fanin.consume(), nil } |