summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/pubsub.go')
-rw-r--r--plugins/redis/pubsub.go12
1 files changed, 6 insertions, 6 deletions
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index 7253511d..4e41acb5 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -16,7 +16,7 @@ type PubSubDriver struct {
cfg *Config `mapstructure:"redis"`
log logger.Logger
- fanin *FanIn
+ channel *redisChannel
universalClient redis.UniversalClient
stopCh chan struct{}
}
@@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
return nil, statusCmd.Err()
}
- ps.fanin = newFanIn(ps.universalClient, log)
+ ps.channel = newRedisChannel(ps.universalClient, log)
ps.stop()
@@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
func (p *PubSubDriver) stop() {
go func() {
for range p.stopCh {
- _ = p.fanin.stop()
+ _ = p.channel.stop()
return
}
}()
@@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
}
// and subscribe after
- return p.fanin.sub(topics...)
+ return p.channel.sub(topics...)
}
func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
@@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error
}
// else - unsubscribe
- err = p.fanin.unsub(topics[i])
+ err = p.channel.unsub(topics[i])
if err != nil {
return err
}
@@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
// Next return next message
func (p *PubSubDriver) Next() (*pubsub.Message, error) {
- return <-p.fanin.consume(), nil
+ return p.channel.message(), nil
}