diff options
Diffstat (limited to 'plugins/redis/pubsub.go')
-rw-r--r-- | plugins/redis/pubsub.go | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 7253511d..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -16,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, statusCmd.Err() } - ps.fanin = newFanIn(ps.universalClient, log) + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() @@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { // Next return next message func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return <-p.fanin.consume(), nil + return p.channel.message(), nil } |