diff options
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r-- | plugins/redis/fanin.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 3082f24f..321bfaaa 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -15,6 +15,7 @@ import ( type FanIn struct { sync.Mutex + // redis client client redis.UniversalClient pubsub *redis.PubSub @@ -26,7 +27,7 @@ type FanIn struct { exit chan struct{} } -func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { out := make(chan *message.Message, 100) fi := &FanIn{ out: out, @@ -42,7 +43,7 @@ func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) AddChannel(topics ...string) error { +func (fi *FanIn) sub(topics ...string) error { const op = errors.Op("fanin_addchannel") err := fi.pubsub.Subscribe(context.Background(), topics...) if err != nil { @@ -71,22 +72,22 @@ func (fi *FanIn) read() { } } -func (fi *FanIn) RemoveChannel(topics ...string) error { +func (fi *FanIn) unsub(topic string) error { const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topics...) + err := fi.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) Stop() error { +func (fi *FanIn) stop() error { fi.exit <- struct{}{} close(fi.out) close(fi.exit) return nil } -func (fi *FanIn) Consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *message.Message { return fi.out } |