diff options
Diffstat (limited to 'plugins/redis/pubsub/channel.go')
-rw-r--r-- | plugins/redis/pubsub/channel.go | 97 |
1 files changed, 0 insertions, 97 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go deleted file mode 100644 index a1655ab2..00000000 --- a/plugins/redis/pubsub/channel.go +++ /dev/null @@ -1,97 +0,0 @@ -package pubsub - -import ( - "context" - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type redisChannel struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *pubsub.Message - - exit chan struct{} -} - -func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { - out := make(chan *pubsub.Message, 100) - fi := &redisChannel{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (r *redisChannel) sub(topics ...string) error { - const op = errors.Op("redis_sub") - err := r.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (r *redisChannel) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the pubsub.Message structure - // payload should be in the redis.message.payload field - - case msg, ok := <-r.pubsub.Channel(): - // channel closed - if !ok { - return - } - - r.out <- &pubsub.Message{ - Topic: msg.Channel, - Payload: utils.AsBytes(msg.Payload), - } - - case <-r.exit: - return - } - } -} - -func (r *redisChannel) unsub(topic string) error { - const op = errors.Op("redis_unsub") - err := r.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (r *redisChannel) stop() error { - r.exit <- struct{}{} - close(r.out) - close(r.exit) - return nil -} - -func (r *redisChannel) message() chan *pubsub.Message { - return r.out -} |