diff options
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/fanin.go | 13 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 75 |
2 files changed, 74 insertions, 14 deletions
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 3082f24f..321bfaaa 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -15,6 +15,7 @@ import ( type FanIn struct { sync.Mutex + // redis client client redis.UniversalClient pubsub *redis.PubSub @@ -26,7 +27,7 @@ type FanIn struct { exit chan struct{} } -func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { out := make(chan *message.Message, 100) fi := &FanIn{ out: out, @@ -42,7 +43,7 @@ func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) AddChannel(topics ...string) error { +func (fi *FanIn) sub(topics ...string) error { const op = errors.Op("fanin_addchannel") err := fi.pubsub.Subscribe(context.Background(), topics...) if err != nil { @@ -71,22 +72,22 @@ func (fi *FanIn) read() { } } -func (fi *FanIn) RemoveChannel(topics ...string) error { +func (fi *FanIn) unsub(topic string) error { const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topics...) + err := fi.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) Stop() error { +func (fi *FanIn) stop() error { fi.exit <- struct{}{} close(fi.out) close(fi.exit) return nil } -func (fi *FanIn) Consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *message.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 5b9de5fc..7b5721f4 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -15,7 +15,7 @@ import ( const PluginName = "redis" type Plugin struct { - sync.Mutex + sync.RWMutex // config for RR integration cfg *Config // logger @@ -23,6 +23,7 @@ type Plugin struct { // redis universal client universalClient redis.UniversalClient + // fanIn implementation used to deliver messages from all channels to the single websocket point fanin *FanIn } @@ -70,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { }) // init fanin - p.fanin = NewFanIn(p.universalClient, log) + p.fanin = newFanIn(p.universalClient, log) return nil } @@ -82,7 +83,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { const op = errors.Op("redis_plugin_stop") - err := p.fanin.Stop() + err := p.fanin.stop() if err != nil { return errors.E(op, err) } @@ -132,15 +133,73 @@ func (p *Plugin) PublishAsync(msg []byte) { }() } -func (p *Plugin) Subscribe(topics ...string) error { - return p.fanin.AddChannel(topics...) +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + // just add a connection + for i := 0; i < len(topics); i++ { + // key - topic + // value - connectionID + hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) + res, err := hset.Result() + if err != nil { + return err + } + if res == 0 { + p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + continue + } + } + + // and subscribe after + return p.fanin.sub(topics...) } -func (p *Plugin) Unsubscribe(topics ...string) error { - return p.fanin.RemoveChannel(topics...) +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + // Remove topics from the storage + for i := 0; i < len(topics); i++ { + srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) + if srem.Err() != nil { + return srem.Err() + } + } + + for i := 0; i < len(topics); i++ { + // if there are no such topics, we can safely unsubscribe from the redis + ssc := p.universalClient.SMembers(context.Background(), topics[i]) + res, err := ssc.Result() + if err != nil { + return err + } + + // if we have associated connections - skip + if len(res) > 0 { + continue + } + + // else - unsubscribe + err = p.fanin.unsub(topics[i]) + if err != nil { + return err + } + } + + return nil +} + +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + hget := p.universalClient.SMembersMap(context.Background(), topic) + r, err := hget.Result() + if err != nil { + panic(err) + } + + // assighn connections + // res expected to be from the sync.Pool + for k := range r { + res[k] = struct{}{} + } } // Next return next message func (p *Plugin) Next() (*message.Message, error) { - return <-p.fanin.Consume(), nil + return <-p.fanin.consume(), nil } |