diff options
author | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-21 00:34:53 +0300 |
commit | 2ab22ac9e935efb126b51e9c3521073e6a5155a1 (patch) | |
tree | 5c6b1d4ee2aea4e6a1cc828ca1fcb2306ef9741e /plugins | |
parent | 18c072d5dbe3ca96fe2198f323d3bf520972e80f (diff) |
- Minor tests improvenments
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/broadcast/plugin.go | 19 | ||||
-rw-r--r-- | plugins/redis/channel.go (renamed from plugins/redis/fanin.go) | 40 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 12 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 2 |
4 files changed, 42 insertions, 31 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 04a4fb80..6ddef806 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -116,7 +116,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -164,17 +164,28 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { return nil, errors.E(op, err) } - // save the pubsub under a config key - // + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection p.publishers[configKey] = ps return ps, nil + + // then try global if local does not exist case p.cfgPlugin.Has(redis): ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // save the pubsub + // if section already exists, return new connection + if _, ok := p.publishers[configKey]; ok { + return ps, nil + } + + // if not - initialize a connection p.publishers[configKey] = ps return ps, nil } diff --git a/plugins/redis/fanin.go b/plugins/redis/channel.go index 40a99d20..5817853c 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/channel.go @@ -11,7 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" ) -type FanIn struct { +type redisChannel struct { sync.Mutex // redis client @@ -26,9 +26,9 @@ type FanIn struct { exit chan struct{} } -func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { out := make(chan *pubsub.Message, 100) - fi := &FanIn{ + fi := &redisChannel{ out: out, client: redisClient, pubsub: redisClient.Subscribe(context.Background()), @@ -42,9 +42,9 @@ func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) sub(topics ...string) error { - const op = errors.Op("fanin_addchannel") - err := fi.pubsub.Subscribe(context.Background(), topics...) +func (r *redisChannel) sub(topics ...string) error { + const op = errors.Op("redis_sub") + err := r.pubsub.Subscribe(context.Background(), topics...) if err != nil { return errors.E(op, err) } @@ -52,46 +52,46 @@ func (fi *FanIn) sub(topics ...string) error { } // read reads messages from the pubsub subscription -func (fi *FanIn) read() { +func (r *redisChannel) read() { for { select { // here we receive message from us (which we sent before in Publish) - // it should be compatible with the websockets.Msg interface + // it should be compatible with the pubsub.Message structure // payload should be in the redis.message.payload field - case msg, ok := <-fi.pubsub.Channel(): + case msg, ok := <-r.pubsub.Channel(): // channel closed if !ok { return } - fi.out <- &pubsub.Message{ + r.out <- &pubsub.Message{ Topic: msg.Channel, Payload: utils.AsBytes(msg.Payload), } - case <-fi.exit: + case <-r.exit: return } } } -func (fi *FanIn) unsub(topic string) error { - const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topic) +func (r *redisChannel) unsub(topic string) error { + const op = errors.Op("redis_unsub") + err := r.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) stop() error { - fi.exit <- struct{}{} - close(fi.out) - close(fi.exit) +func (r *redisChannel) stop() error { + r.exit <- struct{}{} + close(r.out) + close(r.exit) return nil } -func (fi *FanIn) consume() <-chan *pubsub.Message { - return fi.out +func (r *redisChannel) message() *pubsub.Message { + return <-r.out } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index 7253511d..4e41acb5 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -16,7 +16,7 @@ type PubSubDriver struct { cfg *Config `mapstructure:"redis"` log logger.Logger - fanin *FanIn + channel *redisChannel universalClient redis.UniversalClient stopCh chan struct{} } @@ -65,7 +65,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, statusCmd.Err() } - ps.fanin = newFanIn(ps.universalClient, log) + ps.channel = newRedisChannel(ps.universalClient, log) ps.stop() @@ -75,7 +75,7 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, func (p *PubSubDriver) stop() { go func() { for range p.stopCh { - _ = p.fanin.stop() + _ = p.channel.stop() return } }() @@ -122,7 +122,7 @@ func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { } // and subscribe after - return p.fanin.sub(topics...) + return p.channel.sub(topics...) } func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { @@ -148,7 +148,7 @@ func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error } // else - unsubscribe - err = p.fanin.unsub(topics[i]) + err = p.channel.unsub(topics[i]) if err != nil { return err } @@ -173,5 +173,5 @@ func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { // Next return next message func (p *PubSubDriver) Next() (*pubsub.Message, error) { - return <-p.fanin.consume(), nil + return p.channel.message(), nil } diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 00e053ec..752ba3ce 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -93,7 +93,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit wp.subscriber.Connections(msg.Topic, res) if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topic) + wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) wp.put(res) continue } |