summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub/channel.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/pubsub/channel.go')
-rw-r--r--plugins/redis/pubsub/channel.go97
1 files changed, 97 insertions, 0 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go
new file mode 100644
index 00000000..eef5a7b9
--- /dev/null
+++ b/plugins/redis/pubsub/channel.go
@@ -0,0 +1,97 @@
+package pubsub
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+type redisChannel struct {
+ sync.Mutex
+
+ // redis client
+ client redis.UniversalClient
+ pubsub *redis.PubSub
+
+ log logger.Logger
+
+ // out channel with all subs
+ out chan *pubsub.Message
+
+ exit chan struct{}
+}
+
+func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel {
+ out := make(chan *pubsub.Message, 100)
+ fi := &redisChannel{
+ out: out,
+ client: redisClient,
+ pubsub: redisClient.Subscribe(context.Background()),
+ exit: make(chan struct{}),
+ log: log,
+ }
+
+ // start reading messages
+ go fi.read()
+
+ return fi
+}
+
+func (r *redisChannel) sub(topics ...string) error {
+ const op = errors.Op("redis_sub")
+ err := r.pubsub.Subscribe(context.Background(), topics...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+// read reads messages from the pubsub subscription
+func (r *redisChannel) read() {
+ for {
+ select {
+ // here we receive message from us (which we sent before in Publish)
+ // it should be compatible with the pubsub.Message structure
+ // payload should be in the redis.message.payload field
+
+ case msg, ok := <-r.pubsub.Channel():
+ // channel closed
+ if !ok {
+ return
+ }
+
+ r.out <- &pubsub.Message{
+ Topic: msg.Channel,
+ Payload: utils.AsBytes(msg.Payload),
+ }
+
+ case <-r.exit:
+ return
+ }
+ }
+}
+
+func (r *redisChannel) unsub(topic string) error {
+ const op = errors.Op("redis_unsub")
+ err := r.pubsub.Unsubscribe(context.Background(), topic)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (r *redisChannel) stop() error {
+ r.exit <- struct{}{}
+ close(r.out)
+ close(r.exit)
+ return nil
+}
+
+func (r *redisChannel) message() *pubsub.Message {
+ return <-r.out
+}