diff options
author | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
commit | d0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch) | |
tree | 7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/redis/plugin.go | |
parent | 0323e070103cc2c30d2cdfb12719d753acafe151 (diff) |
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r-- | plugins/redis/plugin.go | 75 |
1 files changed, 67 insertions, 8 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 5b9de5fc..7b5721f4 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -15,7 +15,7 @@ import ( const PluginName = "redis" type Plugin struct { - sync.Mutex + sync.RWMutex // config for RR integration cfg *Config // logger @@ -23,6 +23,7 @@ type Plugin struct { // redis universal client universalClient redis.UniversalClient + // fanIn implementation used to deliver messages from all channels to the single websocket point fanin *FanIn } @@ -70,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { }) // init fanin - p.fanin = NewFanIn(p.universalClient, log) + p.fanin = newFanIn(p.universalClient, log) return nil } @@ -82,7 +83,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { const op = errors.Op("redis_plugin_stop") - err := p.fanin.Stop() + err := p.fanin.stop() if err != nil { return errors.E(op, err) } @@ -132,15 +133,73 @@ func (p *Plugin) PublishAsync(msg []byte) { }() } -func (p *Plugin) Subscribe(topics ...string) error { - return p.fanin.AddChannel(topics...) +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + // just add a connection + for i := 0; i < len(topics); i++ { + // key - topic + // value - connectionID + hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) + res, err := hset.Result() + if err != nil { + return err + } + if res == 0 { + p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + continue + } + } + + // and subscribe after + return p.fanin.sub(topics...) } -func (p *Plugin) Unsubscribe(topics ...string) error { - return p.fanin.RemoveChannel(topics...) +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + // Remove topics from the storage + for i := 0; i < len(topics); i++ { + srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) + if srem.Err() != nil { + return srem.Err() + } + } + + for i := 0; i < len(topics); i++ { + // if there are no such topics, we can safely unsubscribe from the redis + ssc := p.universalClient.SMembers(context.Background(), topics[i]) + res, err := ssc.Result() + if err != nil { + return err + } + + // if we have associated connections - skip + if len(res) > 0 { + continue + } + + // else - unsubscribe + err = p.fanin.unsub(topics[i]) + if err != nil { + return err + } + } + + return nil +} + +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + hget := p.universalClient.SMembersMap(context.Background(), topic) + r, err := hget.Result() + if err != nil { + panic(err) + } + + // assighn connections + // res expected to be from the sync.Pool + for k := range r { + res[k] = struct{}{} + } } // Next return next message func (p *Plugin) Next() (*message.Message, error) { - return <-p.fanin.Consume(), nil + return <-p.fanin.consume(), nil } |