summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/plugin.go2
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/driver.go242
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
-rw-r--r--plugins/kv/interface.go2
-rw-r--r--plugins/kv/plugin.go (renamed from plugins/kv/storage.go)40
8 files changed, 33 insertions, 342 deletions
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 9b4cf9f7..28e2a89c 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 3997e0d4..936b2047 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go
index 2be7caae..da81017e 100644
--- a/plugins/kv/drivers/memory/plugin.go
+++ b/plugins/kv/drivers/memory/plugin.go
@@ -45,7 +45,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
deleted file mode 100644
index 41348236..00000000
--- a/plugins/kv/drivers/redis/config.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package redis
-
-import "time"
-
-type Config struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
-// InitDefaults initializing fill config with default values
-func (s *Config) InitDefaults() {
- if s.Addrs == nil {
- s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
- }
-}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
deleted file mode 100644
index 66cb8384..00000000
--- a/plugins/kv/drivers/redis/driver.go
+++ /dev/null
@@ -1,242 +0,0 @@
-package redis
-
-import (
- "context"
- "strings"
- "time"
-
- "github.com/go-redis/redis/v8"
- "github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type Driver struct {
- universalClient redis.UniversalClient
- log logger.Logger
- cfg *Config
-}
-
-func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
- const op = errors.Op("new_boltdb_driver")
-
- d := &Driver{
- log: log,
- }
-
- // will be different for every connected driver
- err := cfgPlugin.UnmarshalKey(key, &d.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- d.cfg.InitDefaults()
- d.log = log
-
- d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: d.cfg.Addrs,
- DB: d.cfg.DB,
- Username: d.cfg.Username,
- Password: d.cfg.Password,
- SentinelPassword: d.cfg.SentinelPassword,
- MaxRetries: d.cfg.MaxRetries,
- MinRetryBackoff: d.cfg.MaxRetryBackoff,
- MaxRetryBackoff: d.cfg.MaxRetryBackoff,
- DialTimeout: d.cfg.DialTimeout,
- ReadTimeout: d.cfg.ReadTimeout,
- WriteTimeout: d.cfg.WriteTimeout,
- PoolSize: d.cfg.PoolSize,
- MinIdleConns: d.cfg.MinIdleConns,
- MaxConnAge: d.cfg.MaxConnAge,
- PoolTimeout: d.cfg.PoolTimeout,
- IdleTimeout: d.cfg.IdleTimeout,
- IdleCheckFrequency: d.cfg.IdleCheckFreq,
- ReadOnly: d.cfg.ReadOnly,
- RouteByLatency: d.cfg.RouteByLatency,
- RouteRandomly: d.cfg.RouteRandomly,
- MasterName: d.cfg.MasterName,
- })
-
- return d, nil
-}
-
-// Has checks if value exists.
-func (d *Driver) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("redis_driver_has")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- m := make(map[string]bool, len(keys))
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
-
- exist, err := d.universalClient.Exists(context.Background(), key).Result()
- if err != nil {
- return nil, err
- }
- if exist == 1 {
- m[key] = true
- }
- }
- return m, nil
-}
-
-// Get loads key content into slice.
-func (d *Driver) Get(key string) ([]byte, error) {
- const op = errors.Op("redis_driver_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- return d.universalClient.Get(context.Background(), key).Bytes()
-}
-
-// MGet loads content of multiple values (some values might be skipped).
-// https://redis.io/commands/mget
-// Returns slice with the interfaces with values
-func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
- const op = errors.Op("redis_driver_mget")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string][]byte, len(keys))
-
- for _, k := range keys {
- cmd := d.universalClient.Get(context.Background(), k)
- if cmd.Err() != nil {
- if cmd.Err() == redis.Nil {
- continue
- }
- return nil, errors.E(op, cmd.Err())
- }
-
- m[k] = utils.AsBytes(cmd.Val())
- }
-
- return m, nil
-}
-
-// Set sets value with the TTL in seconds
-// https://redis.io/commands/set
-// Redis `SET key value [expiration]` command.
-//
-// Use expiration for `SETEX`-like behavior.
-// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...*kvv1.Item) error {
- const op = errors.Op("redis_driver_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
- now := time.Now()
- for _, item := range items {
- if item == nil {
- return errors.E(op, errors.EmptyKey)
- }
-
- if item.Timeout == "" {
- err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
- if err != nil {
- return err
- }
- } else {
- t, err := time.Parse(time.RFC3339, item.Timeout)
- if err != nil {
- return err
- }
- err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err()
- if err != nil {
- return err
- }
- }
- }
- return nil
-}
-
-// Delete one or multiple keys.
-func (d *Driver) Delete(keys ...string) error {
- const op = errors.Op("redis_driver_delete")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
- return d.universalClient.Del(context.Background(), keys...).Err()
-}
-
-// MExpire https://redis.io/commands/expire
-// timeout in RFC3339
-func (d *Driver) MExpire(items ...*kvv1.Item) error {
- const op = errors.Op("redis_driver_mexpire")
- now := time.Now()
- for _, item := range items {
- if item == nil {
- continue
- }
- if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- t, err := time.Parse(time.RFC3339, item.Timeout)
- if err != nil {
- return err
- }
-
- // t guessed to be in future
- // for Redis we use t.Sub, it will result in seconds, like 4.2s
- d.universalClient.Expire(context.Background(), item.Key, t.Sub(now))
- }
-
- return nil
-}
-
-// TTL https://redis.io/commands/ttl
-// return time in seconds (float64) for a given keys
-func (d *Driver) TTL(keys ...string) (map[string]string, error) {
- const op = errors.Op("redis_driver_ttl")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]string, len(keys))
-
- for _, key := range keys {
- duration, err := d.universalClient.TTL(context.Background(), key).Result()
- if err != nil {
- return nil, err
- }
-
- m[key] = duration.String()
- }
- return m, nil
-}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
deleted file mode 100644
index 3694c5a7..00000000
--- a/plugins/kv/drivers/redis/plugin.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package redis
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const PluginName = "redis"
-
-// Plugin BoltDB K/V storage.
-type Plugin struct {
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.log = log
- s.cfgPlugin = cfg
- return nil
-}
-
-// Serve is noop here
-func (s *Plugin) Serve() chan error {
- return make(chan error)
-}
-
-func (s *Plugin) Stop() error {
- return nil
-}
-
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
- const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return st, nil
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 744c6b51..5aedd5c3 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -37,5 +37,5 @@ type StorageDriver interface {
// Provider provides storage based on the config
type Provider interface {
// Provide provides Storage based on the config key
- Provide(key string) (Storage, error)
+ KVProvide(key string) (Storage, error)
}
diff --git a/plugins/kv/storage.go b/plugins/kv/plugin.go
index 9a609735..efe92252 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/plugin.go
@@ -50,7 +50,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
@@ -100,7 +100,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[memcached].Provide(configKey)
+ storage, err := p.drivers[memcached].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -115,7 +115,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[boltdb].Provide(configKey)
+ storage, err := p.drivers[boltdb].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -129,7 +129,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[memory].Provide(configKey)
+ storage, err := p.drivers[memory].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -143,15 +143,33 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[redis].Provide(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case p.cfgPlugin.Has(redis):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ continue
+ default:
+ // otherwise - error, no local or global config
+ p.log.Warn("no global or local redis configuration provided", "key", configKey)
+ continue
}
- // save the storage
- p.storages[k] = storage
-
default:
p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}