summaryrefslogtreecommitdiff
path: root/plugins/redis/fanin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/redis/fanin.go')
-rw-r--r--plugins/redis/fanin.go13
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 3082f24f..321bfaaa 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -15,6 +15,7 @@ import (
type FanIn struct {
sync.Mutex
+ // redis client
client redis.UniversalClient
pubsub *redis.PubSub
@@ -26,7 +27,7 @@ type FanIn struct {
exit chan struct{}
}
-func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
+func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
out := make(chan *message.Message, 100)
fi := &FanIn{
out: out,
@@ -42,7 +43,7 @@ func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
return fi
}
-func (fi *FanIn) AddChannel(topics ...string) error {
+func (fi *FanIn) sub(topics ...string) error {
const op = errors.Op("fanin_addchannel")
err := fi.pubsub.Subscribe(context.Background(), topics...)
if err != nil {
@@ -71,22 +72,22 @@ func (fi *FanIn) read() {
}
}
-func (fi *FanIn) RemoveChannel(topics ...string) error {
+func (fi *FanIn) unsub(topic string) error {
const op = errors.Op("fanin_remove")
- err := fi.pubsub.Unsubscribe(context.Background(), topics...)
+ err := fi.pubsub.Unsubscribe(context.Background(), topic)
if err != nil {
return errors.E(op, err)
}
return nil
}
-func (fi *FanIn) Stop() error {
+func (fi *FanIn) stop() error {
fi.exit <- struct{}{}
close(fi.out)
close(fi.exit)
return nil
}
-func (fi *FanIn) Consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *message.Message {
return fi.out
}