summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r--plugins/redis/fanin.go10
1 files changed, 5 insertions, 5 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 76bef400..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,7 +4,7 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -23,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -67,7 +67,7 @@ func (fi *FanIn) read() {
return
}
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
if err != nil {
fi.log.Error("message unmarshal")
@@ -97,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}