summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub/channel.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/redis/pubsub/channel.go
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/redis/pubsub/channel.go')
-rw-r--r--plugins/redis/pubsub/channel.go97
1 files changed, 0 insertions, 97 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go
deleted file mode 100644
index a1655ab2..00000000
--- a/plugins/redis/pubsub/channel.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package pubsub
-
-import (
- "context"
- "sync"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type redisChannel struct {
- sync.Mutex
-
- // redis client
- client redis.UniversalClient
- pubsub *redis.PubSub
-
- log logger.Logger
-
- // out channel with all subs
- out chan *pubsub.Message
-
- exit chan struct{}
-}
-
-func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel {
- out := make(chan *pubsub.Message, 100)
- fi := &redisChannel{
- out: out,
- client: redisClient,
- pubsub: redisClient.Subscribe(context.Background()),
- exit: make(chan struct{}),
- log: log,
- }
-
- // start reading messages
- go fi.read()
-
- return fi
-}
-
-func (r *redisChannel) sub(topics ...string) error {
- const op = errors.Op("redis_sub")
- err := r.pubsub.Subscribe(context.Background(), topics...)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// read reads messages from the pubsub subscription
-func (r *redisChannel) read() {
- for {
- select {
- // here we receive message from us (which we sent before in Publish)
- // it should be compatible with the pubsub.Message structure
- // payload should be in the redis.message.payload field
-
- case msg, ok := <-r.pubsub.Channel():
- // channel closed
- if !ok {
- return
- }
-
- r.out <- &pubsub.Message{
- Topic: msg.Channel,
- Payload: utils.AsBytes(msg.Payload),
- }
-
- case <-r.exit:
- return
- }
- }
-}
-
-func (r *redisChannel) unsub(topic string) error {
- const op = errors.Op("redis_unsub")
- err := r.pubsub.Unsubscribe(context.Background(), topic)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (r *redisChannel) stop() error {
- r.exit <- struct{}{}
- close(r.out)
- close(r.exit)
- return nil
-}
-
-func (r *redisChannel) message() chan *pubsub.Message {
- return r.out
-}