diff options
author | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
commit | 2dd30155de6faaf6005027d5337a840310c827f9 (patch) | |
tree | aa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/redis/fanin.go | |
parent | 25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff) |
- Update redis/memory pubsubs
- Rework internal message bus
- Add new tests for the broadcast plugin and include them into the GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r-- | plugins/redis/fanin.go | 21 |
1 files changed, 8 insertions, 13 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 0bdd4cf5..40a99d20 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,12 +4,10 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/plugins/logger" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" - "github.com/go-redis/redis/v8" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -23,13 +21,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *websocketsv1.Message + out chan *pubsub.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *websocketsv1.Message, 100) + out := make(chan *pubsub.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -67,14 +65,11 @@ func (fi *FanIn) read() { return } - m := &websocketsv1.Message{} - err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) - if err != nil { - fi.log.Error("message unmarshal") - continue + fi.out <- &pubsub.Message{ + Topic: msg.Channel, + Payload: utils.AsBytes(msg.Payload), } - fi.out <- m case <-fi.exit: return } @@ -97,6 +92,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *websocketsv1.Message { +func (fi *FanIn) consume() <-chan *pubsub.Message { return fi.out } |