From 2ab22ac9e935efb126b51e9c3521073e6a5155a1 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 21 Jun 2021 00:34:53 +0300 Subject: - Minor tests improvenments Signed-off-by: Valery Piashchynski --- plugins/redis/channel.go | 97 ++++++++++++++++++++++++++++++++++++++++++++++++ plugins/redis/fanin.go | 97 ------------------------------------------------ plugins/redis/pubsub.go | 12 +++--- 3 files changed, 103 insertions(+), 103 deletions(-) create mode 100644 plugins/redis/channel.go delete mode 100644 plugins/redis/fanin.go (limited to 'plugins/redis') diff --git a/plugins/redis/channel.go b/plugins/redis/channel.go new file mode 100644 index 00000000..5817853c --- /dev/null +++ b/plugins/redis/channel.go @@ -0,0 +1,97 @@ +package redis + +import ( + "context" + "sync" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" +) + +type redisChannel struct { + sync.Mutex + + // redis client + client redis.UniversalClient + pubsub *redis.PubSub + + log logger.Logger + + // out channel with all subs + out chan *pubsub.Message + + exit chan struct{} +} + +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { + out := make(chan *pubsub.Message, 100) + fi := &redisChannel{ + out: out, + client: redisClient, + pubsub: redisClient.Subscribe(context.Background()), + exit: make(chan struct{}), + log: log, + } + + // start reading messages + go fi.read() + + return fi +} + +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) + if err != nil { + return errors.E(op, err) + } + return nil +} + +// read reads messages from the pubsub subscription +func (r *redisChannel) read() { + for { + select { + // here we receive message from us (which we sent before in Publish) + // it should be compatible with the pubsub.Message structure + // payload should be in the redis.message.payload field + + case msg, ok := <-r.pubsub.Channel(): + // channel closed + if !ok { + return + } + + r.out <- &pubsub.Message{ + Topic: msg.Channel, + Payload: utils.AsBytes(msg.Payload), + } + + case <-r.exit: + return + } + } +} + +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) + if err != nil { + return errors.E(op, err) + } + return nil +} + +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) + return nil +} + +func (r *redisChannel) message() *pubsub.Message { + return <-r.out +} diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go deleted file mode 100644 index 40a99d20..00000000 --- a/plugins/redis/fanin.go +++ /dev/null @@ -1,97 +0,0 @@ -package redis - -import ( - "context" - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type FanIn struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *pubsub.Message - - exit chan struct{} -} - -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *pubsub.Message, 100) - fi := &FanIn{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (fi *FanIn) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface - // payload should be in the redis.message.payload field - - case msg, ok := <-fi.pubsub.Channel(): - // channel closed - if !ok { - return - } - - fi.out <- &pubsub.Message{ - Topic: msg.Channel, - Payload: utils.AsBytes(msg.Payload), - } - - case <-fi.exit: - return - } - } -} - -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) - return nil -} - -func (fi *FanIn) consume() <-chan *pubsub.Message { - return fi.out -} diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 7253511d..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -16,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, statusCmd.Err() } - ps.fanin = newFanIn(ps.universalClient, log) + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() @@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { // Next return next message func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return <-p.fanin.consume(), nil + return p.channel.message(), nil } -- cgit v1.2.3