diff options
author | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
commit | 2ab22ac9e935efb126b51e9c3521073e6a5155a1 (patch) | |
tree | 5c6b1d4ee2aea4e6a1cc828ca1fcb2306ef9741e /plugins/redis | |
parent | 18c072d5dbe3ca96fe2198f323d3bf520972e80f (diff) |
- Minor tests improvenments
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/channel.go (renamed from plugins/redis/fanin.go) | 40 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 12 |
2 files changed, 26 insertions, 26 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/channel.go index 40a99d20..5817853c 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/channel.go @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type FanIn struct { +type redisChannel struct { sync.Mutex // redis client @@ -26,9 +26,9 @@ type FanIn struct { exit chan struct{} } -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { out := make(chan *pubsub.Message, 100) - fi := &FanIn{ + fi := &redisChannel{ out: out, client: redisClient, pubsub: redisClient.Subscribe(context.Background()), @@ -42,9 +42,9 @@ func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) if err != nil { return errors.E(op, err) } @@ -52,46 +52,46 @@ func (fi *FanIn) sub(topics ...string) error { } // read reads messages from the pubsub subscription -func (fi *FanIn) read() { +func (r *redisChannel) read() { for { select { // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface + // it should be compatible with the pubsub.Message structure // payload should be in the redis.message.payload field - case msg, ok := <-fi.pubsub.Channel(): + case msg, ok := <-r.pubsub.Channel(): // channel closed if !ok { return } - fi.out <- &pubsub.Message{ + r.out <- &pubsub.Message{ Topic: msg.Channel, Payload: utils.AsBytes(msg.Payload), } - case <-fi.exit: + case <-r.exit: return } } } -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) return nil } -func (fi *FanIn) consume() <-chan *pubsub.Message { - return fi.out +func (r *redisChannel) message() *pubsub.Message { + return <-r.out } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 7253511d..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -16,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, statusCmd.Err() } - ps.fanin = newFanIn(ps.universalClient, log) + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() @@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { // Next return next message func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return <-p.fanin.consume(), nil + return p.channel.message(), nil } |