diff options
author | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
commit | 2ab22ac9e935efb126b51e9c3521073e6a5155a1 (patch) | |
tree | 5c6b1d4ee2aea4e6a1cc828ca1fcb2306ef9741e /plugins/redis/channel.go | |
parent | 18c072d5dbe3ca96fe2198f323d3bf520972e80f (diff) |
- Minor tests improvenments
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/channel.go')
-rw-r--r-- | plugins/redis/channel.go | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go new file mode 100644 index 00000000..5817853c --- /dev/null +++ b/plugins/redis/channel.go @@ -0,0 +1,97 @@ +package redis + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type redisChannel struct { + sync.Mutex + + // redis client + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan *pubsub.Message + + exit chan struct{} +} + +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { + out := make(chan *pubsub.Message, 100) + fi := &redisChannel{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (r *redisChannel) read() { + for { + select { + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the pubsub.Message structure + // payload should be in the redis.message.payload field + + case msg, ok := <-r.pubsub.Channel(): + // channel closed + if !ok { + return + } + + r.out <- &pubsub.Message{ + Topic: msg.Channel, + Payload: utils.AsBytes(msg.Payload), + } + + case <-r.exit: + return + } + } +} + +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) + return nil +} + +func (r *redisChannel) message() *pubsub.Message { + return <-r.out +} |