summaryrefslogtreecommitdiff
path: root/plugins/redis/pubsub/pubsub.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 21:46:50 +0300
committerGitHub <[email protected]>2021-09-16 21:46:50 +0300
commit3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch)
treee723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /plugins/redis/pubsub/pubsub.go
parent337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff)
parent823d831b57b75f70c7c3bbbee355f2016633bb3b (diff)
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'plugins/redis/pubsub/pubsub.go')
-rw-r--r--plugins/redis/pubsub/pubsub.go187
1 files changed, 0 insertions, 187 deletions
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
deleted file mode 100644
index 3561ef18..00000000
--- a/plugins/redis/pubsub/pubsub.go
+++ /dev/null
@@ -1,187 +0,0 @@
-package pubsub
-
-import (
- "context"
- "sync"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-type PubSubDriver struct {
- sync.RWMutex
- cfg *Config
-
- log logger.Logger
- channel *redisChannel
- universalClient redis.UniversalClient
- stopCh chan struct{}
-}
-
-func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (*PubSubDriver, error) {
- const op = errors.Op("new_pub_sub_driver")
- ps := &PubSubDriver{
- log: log,
- stopCh: stopCh,
- }
-
- // will be different for every connected driver
- err := cfgPlugin.UnmarshalKey(key, &ps.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- if ps.cfg == nil {
- return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
- }
-
- ps.cfg.InitDefaults()
-
- ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: ps.cfg.Addrs,
- DB: ps.cfg.DB,
- Username: ps.cfg.Username,
- Password: ps.cfg.Password,
- SentinelPassword: ps.cfg.SentinelPassword,
- MaxRetries: ps.cfg.MaxRetries,
- MinRetryBackoff: ps.cfg.MaxRetryBackoff,
- MaxRetryBackoff: ps.cfg.MaxRetryBackoff,
- DialTimeout: ps.cfg.DialTimeout,
- ReadTimeout: ps.cfg.ReadTimeout,
- WriteTimeout: ps.cfg.WriteTimeout,
- PoolSize: ps.cfg.PoolSize,
- MinIdleConns: ps.cfg.MinIdleConns,
- MaxConnAge: ps.cfg.MaxConnAge,
- PoolTimeout: ps.cfg.PoolTimeout,
- IdleTimeout: ps.cfg.IdleTimeout,
- IdleCheckFrequency: ps.cfg.IdleCheckFreq,
- ReadOnly: ps.cfg.ReadOnly,
- RouteByLatency: ps.cfg.RouteByLatency,
- RouteRandomly: ps.cfg.RouteRandomly,
- MasterName: ps.cfg.MasterName,
- })
-
- statusCmd := ps.universalClient.Ping(context.Background())
- if statusCmd.Err() != nil {
- return nil, statusCmd.Err()
- }
-
- ps.channel = newRedisChannel(ps.universalClient, log)
-
- ps.stop()
-
- return ps, nil
-}
-
-func (p *PubSubDriver) stop() {
- go func() {
- for range p.stopCh {
- _ = p.channel.stop()
- return
- }
- }()
-}
-
-func (p *PubSubDriver) Publish(msg *pubsub.Message) error {
- p.Lock()
- defer p.Unlock()
-
- f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
- if f.Err() != nil {
- return f.Err()
- }
-
- return nil
-}
-
-func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) {
- go func() {
- p.Lock()
- defer p.Unlock()
-
- f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
- }()
-}
-
-func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
- // just add a connection
- for i := 0; i < len(topics); i++ {
- // key - topic
- // value - connectionID
- hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID)
- res, err := hset.Result()
- if err != nil {
- return err
- }
- if res == 0 {
- p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i])
- continue
- }
- }
-
- // and subscribe after
- return p.channel.sub(topics...)
-}
-
-func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
- // Remove topics from the storage
- for i := 0; i < len(topics); i++ {
- srem := p.universalClient.SRem(context.Background(), topics[i], connectionID)
- if srem.Err() != nil {
- return srem.Err()
- }
- }
-
- for i := 0; i < len(topics); i++ {
- // if there are no such topics, we can safely unsubscribe from the redis
- exists := p.universalClient.Exists(context.Background(), topics[i])
- res, err := exists.Result()
- if err != nil {
- return err
- }
-
- // if we have associated connections - skip
- if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it
- continue
- }
-
- // else - unsubscribe
- err = p.channel.unsub(topics[i])
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
- hget := p.universalClient.SMembersMap(context.Background(), topic)
- r, err := hget.Result()
- if err != nil {
- panic(err)
- }
-
- // assign connections
- // res expected to be from the sync.Pool
- for k := range r {
- res[k] = struct{}{}
- }
-}
-
-// Next return next message
-func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) {
- const op = errors.Op("redis_driver_next")
- select {
- case msg := <-p.channel.message():
- return msg, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.TimeOut, ctx.Err())
- }
-}