diff options
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r-- | plugins/redis/fanin.go | 11 |
1 files changed, 10 insertions, 1 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 321bfaaa..76bef400 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" "github.com/spiral/errors" @@ -65,7 +66,15 @@ func (fi *FanIn) read() { if !ok { return } - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) + + m := &message.Message{} + err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("message unmarshal") + continue + } + + fi.out <- m case <-fi.exit: return } |