summaryrefslogtreecommitdiff
path: root/plugins/redis/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/redis/plugin.go
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/redis/plugin.go')
-rw-r--r--plugins/redis/plugin.go75
1 files changed, 67 insertions, 8 deletions
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 5b9de5fc..7b5721f4 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -15,7 +15,7 @@ import (
const PluginName = "redis"
type Plugin struct {
- sync.Mutex
+ sync.RWMutex
// config for RR integration
cfg *Config
// logger
@@ -23,6 +23,7 @@ type Plugin struct {
// redis universal client
universalClient redis.UniversalClient
+ // fanIn implementation used to deliver messages from all channels to the single websocket point
fanin *FanIn
}
@@ -70,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
})
// init fanin
- p.fanin = NewFanIn(p.universalClient, log)
+ p.fanin = newFanIn(p.universalClient, log)
return nil
}
@@ -82,7 +83,7 @@ func (p *Plugin) Serve() chan error {
func (p *Plugin) Stop() error {
const op = errors.Op("redis_plugin_stop")
- err := p.fanin.Stop()
+ err := p.fanin.stop()
if err != nil {
return errors.E(op, err)
}
@@ -132,15 +133,73 @@ func (p *Plugin) PublishAsync(msg []byte) {
}()
}
-func (p *Plugin) Subscribe(topics ...string) error {
- return p.fanin.AddChannel(topics...)
+func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+ // just add a connection
+ for i := 0; i < len(topics); i++ {
+ // key - topic
+ // value - connectionID
+ hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID)
+ res, err := hset.Result()
+ if err != nil {
+ return err
+ }
+ if res == 0 {
+ p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ continue
+ }
+ }
+
+ // and subscribe after
+ return p.fanin.sub(topics...)
}
-func (p *Plugin) Unsubscribe(topics ...string) error {
- return p.fanin.RemoveChannel(topics...)
+func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+ // Remove topics from the storage
+ for i := 0; i < len(topics); i++ {
+ srem := p.universalClient.SRem(context.Background(), topics[i], connectionID)
+ if srem.Err() != nil {
+ return srem.Err()
+ }
+ }
+
+ for i := 0; i < len(topics); i++ {
+ // if there are no such topics, we can safely unsubscribe from the redis
+ ssc := p.universalClient.SMembers(context.Background(), topics[i])
+ res, err := ssc.Result()
+ if err != nil {
+ return err
+ }
+
+ // if we have associated connections - skip
+ if len(res) > 0 {
+ continue
+ }
+
+ // else - unsubscribe
+ err = p.fanin.unsub(topics[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+ hget := p.universalClient.SMembersMap(context.Background(), topic)
+ r, err := hget.Result()
+ if err != nil {
+ panic(err)
+ }
+
+ // assighn connections
+ // res expected to be from the sync.Pool
+ for k := range r {
+ res[k] = struct{}{}
+ }
}
// Next return next message
func (p *Plugin) Next() (*message.Message, error) {
- return <-p.fanin.Consume(), nil
+ return <-p.fanin.consume(), nil
}