diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/redis/pubsub/channel.go | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/redis/pubsub/channel.go')
-rw-r--r-- | plugins/redis/pubsub/channel.go | 97 |
1 files changed, 0 insertions, 97 deletions
diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go deleted file mode 100644 index a1655ab2..00000000 --- a/plugins/redis/pubsub/channel.go +++ /dev/null @@ -1,97 +0,0 @@ -package pubsub - -import ( - "context" - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type redisChannel struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *pubsub.Message - - exit chan struct{} -} - -func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { - out := make(chan *pubsub.Message, 100) - fi := &redisChannel{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (r *redisChannel) sub(topics ...string) error { - const op = errors.Op("redis_sub") - err := r.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (r *redisChannel) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the pubsub.Message structure - // payload should be in the redis.message.payload field - - case msg, ok := <-r.pubsub.Channel(): - // channel closed - if !ok { - return - } - - r.out <- &pubsub.Message{ - Topic: msg.Channel, - Payload: utils.AsBytes(msg.Payload), - } - - case <-r.exit: - return - } - } -} - -func (r *redisChannel) unsub(topic string) error { - const op = errors.Op("redis_unsub") - err := r.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (r *redisChannel) stop() error { - r.exit <- struct{}{} - close(r.out) - close(r.exit) - return nil -} - -func (r *redisChannel) message() chan *pubsub.Message { - return r.out -} |