summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r--plugins/redis/fanin.go102
1 files changed, 0 insertions, 102 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
deleted file mode 100644
index ac9ebcc2..00000000
--- a/plugins/redis/fanin.go
+++ /dev/null
@@ -1,102 +0,0 @@
-package redis
-
-import (
- "context"
- "sync"
-
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type FanIn struct {
- sync.Mutex
-
- // redis client
- client redis.UniversalClient
- pubsub *redis.PubSub
-
- log logger.Logger
-
- // out channel with all subs
- out chan *websocketsv1.Message
-
- exit chan struct{}
-}
-
-func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *websocketsv1.Message, 100)
- fi := &FanIn{
- out: out,
- client: redisClient,
- pubsub: redisClient.Subscribe(context.Background()),
- exit: make(chan struct{}),
- log: log,
- }
-
- // start reading messages
- go fi.read()
-
- return fi
-}
-
-func (fi *FanIn) sub(topics ...string) error {
- const op = errors.Op("fanin_addchannel")
- err := fi.pubsub.Subscribe(context.Background(), topics...)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// read reads messages from the pubsub subscription
-func (fi *FanIn) read() {
- for {
- select {
- // here we receive message from us (which we sent before in Publish)
- // it should be compatible with the websockets.Msg interface
- // payload should be in the redis.message.payload field
-
- case msg, ok := <-fi.pubsub.Channel():
- // channel closed
- if !ok {
- return
- }
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
- if err != nil {
- fi.log.Error("message unmarshal")
- continue
- }
-
- fi.out <- m
- case <-fi.exit:
- return
- }
- }
-}
-
-func (fi *FanIn) unsub(topic string) error {
- const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topic)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (fi *FanIn) stop() error {
- fi.exit <- struct{}{}
- close(fi.out)
- close(fi.exit)
- return nil
-}
-
-func (fi *FanIn) consume() <-chan *websocketsv1.Message {
- return fi.out
-}