diff options
Diffstat (limited to 'plugins/redis')
-rw-r--r-- | plugins/redis/config.go | 34 | ||||
-rw-r--r-- | plugins/redis/kv/config.go | 36 | ||||
-rw-r--r-- | plugins/redis/kv/kv.go | 255 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 77 | ||||
-rw-r--r-- | plugins/redis/pubsub/channel.go | 97 | ||||
-rw-r--r-- | plugins/redis/pubsub/config.go | 34 | ||||
-rw-r--r-- | plugins/redis/pubsub/pubsub.go | 187 |
7 files changed, 0 insertions, 720 deletions
diff --git a/plugins/redis/config.go b/plugins/redis/config.go deleted file mode 100644 index 9acb4b47..00000000 --- a/plugins/redis/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package redis - -import "time" - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go deleted file mode 100644 index 5bd772a9..00000000 --- a/plugins/redis/kv/config.go +++ /dev/null @@ -1,36 +0,0 @@ -package kv - -import ( - "time" -) - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go deleted file mode 100644 index ae55d332..00000000 --- a/plugins/redis/kv/kv.go +++ /dev/null @@ -1,255 +0,0 @@ -package kv - -import ( - "context" - "strings" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" - "github.com/spiral/roadrunner/v2/utils" -) - -type Driver struct { - universalClient redis.UniversalClient - log logger.Logger - cfg *Config -} - -func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_redis_driver") - - d := &Driver{ - log: log, - } - - // will be different for every connected driver - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if d.cfg == nil { - return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) - } - - d.cfg.InitDefaults() - - d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: d.cfg.Addrs, - DB: d.cfg.DB, - Username: d.cfg.Username, - Password: d.cfg.Password, - SentinelPassword: d.cfg.SentinelPassword, - MaxRetries: d.cfg.MaxRetries, - MinRetryBackoff: d.cfg.MaxRetryBackoff, - MaxRetryBackoff: d.cfg.MaxRetryBackoff, - DialTimeout: d.cfg.DialTimeout, - ReadTimeout: d.cfg.ReadTimeout, - WriteTimeout: d.cfg.WriteTimeout, - PoolSize: d.cfg.PoolSize, - MinIdleConns: d.cfg.MinIdleConns, - MaxConnAge: d.cfg.MaxConnAge, - PoolTimeout: d.cfg.PoolTimeout, - IdleTimeout: d.cfg.IdleTimeout, - IdleCheckFrequency: d.cfg.IdleCheckFreq, - ReadOnly: d.cfg.ReadOnly, - RouteByLatency: d.cfg.RouteByLatency, - RouteRandomly: d.cfg.RouteRandomly, - MasterName: d.cfg.MasterName, - }) - - return d, nil -} - -// Has checks if value exists. -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("redis_driver_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - m := make(map[string]bool, len(keys)) - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - exist, err := d.universalClient.Exists(context.Background(), key).Result() - if err != nil { - return nil, err - } - if exist == 1 { - m[key] = true - } - } - return m, nil -} - -// Get loads key content into slice. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("redis_driver_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - return d.universalClient.Get(context.Background(), key).Bytes() -} - -// MGet loads content of multiple values (some values might be skipped). -// https://redis.io/commands/mget -// Returns slice with the interfaces with values -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("redis_driver_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for _, k := range keys { - cmd := d.universalClient.Get(context.Background(), k) - if cmd.Err() != nil { - if cmd.Err() == redis.Nil { - continue - } - return nil, errors.E(op, cmd.Err()) - } - - m[k] = utils.AsBytes(cmd.Val()) - } - - return m, nil -} - -// Set sets value with the TTL in seconds -// https://redis.io/commands/set -// Redis `SET key value [expiration]` command. -// -// Use expiration for `SETEX`-like behavior. -// Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("redis_driver_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - now := time.Now() - for _, item := range items { - if item == nil { - return errors.E(op, errors.EmptyKey) - } - - if item.Timeout == "" { - err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() - if err != nil { - return err - } - } else { - t, err := time.Parse(time.RFC3339, item.Timeout) - if err != nil { - return err - } - err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err() - if err != nil { - return err - } - } - } - return nil -} - -// Delete one or multiple keys. -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("redis_driver_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - return d.universalClient.Del(context.Background(), keys...).Err() -} - -// MExpire https://redis.io/commands/expire -// timeout in RFC3339 -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("redis_driver_mexpire") - now := time.Now() - for _, item := range items { - if item == nil { - continue - } - if item.Timeout == "" || strings.TrimSpace(item.Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - t, err := time.Parse(time.RFC3339, item.Timeout) - if err != nil { - return err - } - - // t guessed to be in future - // for Redis we use t.Sub, it will result in seconds, like 4.2s - d.universalClient.Expire(context.Background(), item.Key, t.Sub(now)) - } - - return nil -} - -// TTL https://redis.io/commands/ttl -// return time in seconds (float64) for a given keys -func (d *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("redis_driver_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for _, key := range keys { - duration, err := d.universalClient.TTL(context.Background(), key).Result() - if err != nil { - return nil, err - } - - m[key] = duration.String() - } - return m, nil -} - -func (d *Driver) Clear() error { - fdb := d.universalClient.FlushDB(context.Background()) - if fdb.Err() != nil { - return fdb.Err() - } - - return nil -} - -func (d *Driver) Stop() {} diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go deleted file mode 100644 index 961182a9..00000000 --- a/plugins/redis/plugin.go +++ /dev/null @@ -1,77 +0,0 @@ -package redis - -import ( - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/kv" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - redis_kv "github.com/spiral/roadrunner/v2/plugins/redis/kv" - redis_pubsub "github.com/spiral/roadrunner/v2/plugins/redis/pubsub" -) - -const PluginName = "redis" - -type Plugin struct { - sync.RWMutex - // config for RR integration - cfgPlugin config.Configurer - // logger - log logger.Logger - // redis universal client - universalClient redis.UniversalClient - - // fanIn implementation used to deliver messages from all channels to the single websocket point - stopCh chan struct{} -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - p.log = log - p.cfgPlugin = cfg - p.stopCh = make(chan struct{}, 1) - - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error) -} - -func (p *Plugin) Stop() error { - const op = errors.Op("redis_plugin_stop") - p.stopCh <- struct{}{} - - if p.universalClient != nil { - err := p.universalClient.Close() - if err != nil { - return errors.E(op, err) - } - } - - return nil -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (p *Plugin) Available() {} - -// KVConstruct provides KV storage implementation over the redis plugin -func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("redis_plugin_provide") - st, err := redis_kv.NewRedisDriver(p.log, key, p.cfgPlugin) - if err != nil { - return nil, errors.E(op, err) - } - - return st, nil -} - -func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { - return redis_pubsub.NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) -} diff --git a/plugins/redis/pubsub/channel.go b/plugins/redis/pubsub/channel.go deleted file mode 100644 index a1655ab2..00000000 --- a/plugins/redis/pubsub/channel.go +++ /dev/null @@ -1,97 +0,0 @@ -package pubsub - -import ( - "context" - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type redisChannel struct { - sync.Mutex - - // redis client - client redis.UniversalClient - pubsub *redis.PubSub - - log logger.Logger - - // out channel with all subs - out chan *pubsub.Message - - exit chan struct{} -} - -func newRedisChannel(redisClient redis.UniversalClient, log logger.Logger) *redisChannel { - out := make(chan *pubsub.Message, 100) - fi := &redisChannel{ - out: out, - client: redisClient, - pubsub: redisClient.Subscribe(context.Background()), - exit: make(chan struct{}), - log: log, - } - - // start reading messages - go fi.read() - - return fi -} - -func (r *redisChannel) sub(topics ...string) error { - const op = errors.Op("redis_sub") - err := r.pubsub.Subscribe(context.Background(), topics...) - if err != nil { - return errors.E(op, err) - } - return nil -} - -// read reads messages from the pubsub subscription -func (r *redisChannel) read() { - for { - select { - // here we receive message from us (which we sent before in Publish) - // it should be compatible with the pubsub.Message structure - // payload should be in the redis.message.payload field - - case msg, ok := <-r.pubsub.Channel(): - // channel closed - if !ok { - return - } - - r.out <- &pubsub.Message{ - Topic: msg.Channel, - Payload: utils.AsBytes(msg.Payload), - } - - case <-r.exit: - return - } - } -} - -func (r *redisChannel) unsub(topic string) error { - const op = errors.Op("redis_unsub") - err := r.pubsub.Unsubscribe(context.Background(), topic) - if err != nil { - return errors.E(op, err) - } - return nil -} - -func (r *redisChannel) stop() error { - r.exit <- struct{}{} - close(r.out) - close(r.exit) - return nil -} - -func (r *redisChannel) message() chan *pubsub.Message { - return r.out -} diff --git a/plugins/redis/pubsub/config.go b/plugins/redis/pubsub/config.go deleted file mode 100644 index bf8d2fc9..00000000 --- a/plugins/redis/pubsub/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package pubsub - -import "time" - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go deleted file mode 100644 index 3561ef18..00000000 --- a/plugins/redis/pubsub/pubsub.go +++ /dev/null @@ -1,187 +0,0 @@ -package pubsub - -import ( - "context" - "sync" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -type PubSubDriver struct { - sync.RWMutex - cfg *Config - - log logger.Logger - channel *redisChannel - universalClient redis.UniversalClient - stopCh chan struct{} -} - -func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (*PubSubDriver, error) { - const op = errors.Op("new_pub_sub_driver") - ps := &PubSubDriver{ - log: log, - stopCh: stopCh, - } - - // will be different for every connected driver - err := cfgPlugin.UnmarshalKey(key, &ps.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - if ps.cfg == nil { - return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) - } - - ps.cfg.InitDefaults() - - ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: ps.cfg.Addrs, - DB: ps.cfg.DB, - Username: ps.cfg.Username, - Password: ps.cfg.Password, - SentinelPassword: ps.cfg.SentinelPassword, - MaxRetries: ps.cfg.MaxRetries, - MinRetryBackoff: ps.cfg.MaxRetryBackoff, - MaxRetryBackoff: ps.cfg.MaxRetryBackoff, - DialTimeout: ps.cfg.DialTimeout, - ReadTimeout: ps.cfg.ReadTimeout, - WriteTimeout: ps.cfg.WriteTimeout, - PoolSize: ps.cfg.PoolSize, - MinIdleConns: ps.cfg.MinIdleConns, - MaxConnAge: ps.cfg.MaxConnAge, - PoolTimeout: ps.cfg.PoolTimeout, - IdleTimeout: ps.cfg.IdleTimeout, - IdleCheckFrequency: ps.cfg.IdleCheckFreq, - ReadOnly: ps.cfg.ReadOnly, - RouteByLatency: ps.cfg.RouteByLatency, - RouteRandomly: ps.cfg.RouteRandomly, - MasterName: ps.cfg.MasterName, - }) - - statusCmd := ps.universalClient.Ping(context.Background()) - if statusCmd.Err() != nil { - return nil, statusCmd.Err() - } - - ps.channel = newRedisChannel(ps.universalClient, log) - - ps.stop() - - return ps, nil -} - -func (p *PubSubDriver) stop() { - go func() { - for range p.stopCh { - _ = p.channel.stop() - return - } - }() -} - -func (p *PubSubDriver) Publish(msg *pubsub.Message) error { - p.Lock() - defer p.Unlock() - - f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) - if f.Err() != nil { - return f.Err() - } - - return nil -} - -func (p *PubSubDriver) PublishAsync(msg *pubsub.Message) { - go func() { - p.Lock() - defer p.Unlock() - - f := p.universalClient.Publish(context.Background(), msg.Topic, msg.Payload) - if f.Err() != nil { - p.log.Error("redis publish", "error", f.Err()) - } - }() -} - -func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error { - // just add a connection - for i := 0; i < len(topics); i++ { - // key - topic - // value - connectionID - hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) - res, err := hset.Result() - if err != nil { - return err - } - if res == 0 { - p.log.Warn("could not subscribe to the provided topic, you might be already subscribed to it", "connectionID", connectionID, "topic", topics[i]) - continue - } - } - - // and subscribe after - return p.channel.sub(topics...) -} - -func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error { - // Remove topics from the storage - for i := 0; i < len(topics); i++ { - srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) - if srem.Err() != nil { - return srem.Err() - } - } - - for i := 0; i < len(topics); i++ { - // if there are no such topics, we can safely unsubscribe from the redis - exists := p.universalClient.Exists(context.Background(), topics[i]) - res, err := exists.Result() - if err != nil { - return err - } - - // if we have associated connections - skip - if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it - continue - } - - // else - unsubscribe - err = p.channel.unsub(topics[i]) - if err != nil { - return err - } - } - - return nil -} - -func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) { - hget := p.universalClient.SMembersMap(context.Background(), topic) - r, err := hget.Result() - if err != nil { - panic(err) - } - - // assign connections - // res expected to be from the sync.Pool - for k := range r { - res[k] = struct{}{} - } -} - -// Next return next message -func (p *PubSubDriver) Next(ctx context.Context) (*pubsub.Message, error) { - const op = errors.Op("redis_driver_next") - select { - case msg := <-p.channel.message(): - return msg, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.TimeOut, ctx.Err()) - } -} |