summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-22 22:53:12 +0300
committerValery Piashchynski <[email protected]>2021-04-22 22:53:12 +0300
commit06ee748c1ca23bf489ae707041f0e8f3e67fbaf7 (patch)
tree4249c893d194c779fd7510cef2de7e71b888735f /plugins/kv
parent91c1fa2e2693cb662425c1ba7cca2325a8458995 (diff)
- Add redis driver
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go14
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/driver.go238
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
-rw-r--r--plugins/kv/storage.go25
5 files changed, 343 insertions, 19 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 8c3bb68f..b596d4c3 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -80,7 +80,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
}
func (d *Driver) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("boltdb_plugin_has")
+ const op = errors.Op("boltdb_driver_has")
d.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -121,7 +121,7 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) {
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (d *Driver) Get(key string) ([]byte, error) {
- const op = errors.Op("boltdb_plugin_get")
+ const op = errors.Op("boltdb_driver_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
if keyTrimmed == "" {
@@ -161,7 +161,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
}
func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_mget")
+ const op = errors.Op("boltdb_driver_mget")
// defense
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -213,7 +213,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Set puts the K/V to the bolt
func (d *Driver) Set(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_set")
+ const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
@@ -276,7 +276,7 @@ func (d *Driver) Set(items ...kv.Item) error {
// Delete all keys from DB
func (d *Driver) Delete(keys ...string) error {
- const op = errors.Op("boltdb_plugin_delete")
+ const op = errors.Op("boltdb_driver_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
}
@@ -323,7 +323,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
func (d *Driver) MExpire(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_mexpire")
+ const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
@@ -341,7 +341,7 @@ func (d *Driver) MExpire(items ...kv.Item) error {
}
func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_ttl")
+ const op = errors.Op("boltdb_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
new file mode 100644
index 00000000..41348236
--- /dev/null
+++ b/plugins/kv/drivers/redis/config.go
@@ -0,0 +1,34 @@
+package redis
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
new file mode 100644
index 00000000..3cc35c13
--- /dev/null
+++ b/plugins/kv/drivers/redis/driver.go
@@ -0,0 +1,238 @@
+package redis
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var EmptyItem = kv.Item{}
+
+type Driver struct {
+ universalClient redis.UniversalClient
+ log logger.Logger
+ cfg *Config
+}
+
+func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.cfg.InitDefaults()
+ d.log = log
+
+ d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: d.cfg.Addrs,
+ DB: d.cfg.DB,
+ Username: d.cfg.Username,
+ Password: d.cfg.Password,
+ SentinelPassword: d.cfg.SentinelPassword,
+ MaxRetries: d.cfg.MaxRetries,
+ MinRetryBackoff: d.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: d.cfg.MaxRetryBackoff,
+ DialTimeout: d.cfg.DialTimeout,
+ ReadTimeout: d.cfg.ReadTimeout,
+ WriteTimeout: d.cfg.WriteTimeout,
+ PoolSize: d.cfg.PoolSize,
+ MinIdleConns: d.cfg.MinIdleConns,
+ MaxConnAge: d.cfg.MaxConnAge,
+ PoolTimeout: d.cfg.PoolTimeout,
+ IdleTimeout: d.cfg.IdleTimeout,
+ IdleCheckFrequency: d.cfg.IdleCheckFreq,
+ ReadOnly: d.cfg.ReadOnly,
+ RouteByLatency: d.cfg.RouteByLatency,
+ RouteRandomly: d.cfg.RouteRandomly,
+ MasterName: d.cfg.MasterName,
+ })
+
+ return d, nil
+}
+
+// Has checks if value exists.
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("redis_driver_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ exist, err := d.universalClient.Exists(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+ if exist == 1 {
+ m[key] = true
+ }
+ }
+ return m, nil
+}
+
+// Get loads key content into slice.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("redis_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ return d.universalClient.Get(context.Background(), key).Bytes()
+}
+
+// MGet loads content of multiple values (some values might be skipped).
+// https://redis.io/commands/mget
+// Returns slice with the interfaces with values
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, k := range keys {
+ cmd := d.universalClient.Get(context.Background(), k)
+ if cmd.Err() != nil {
+ if cmd.Err() == redis.Nil {
+ continue
+ }
+ return nil, errors.E(op, cmd.Err())
+ }
+
+ m[k] = cmd.Val()
+ }
+
+ return m, nil
+}
+
+// Set sets value with the TTL in seconds
+// https://redis.io/commands/set
+// Redis `SET key value [expiration]` command.
+//
+// Use expiration for `SETEX`-like behavior.
+// Zero expiration means the key has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+ now := time.Now()
+ for _, item := range items {
+ if item == EmptyItem {
+ return errors.E(op, errors.EmptyKey)
+ }
+
+ if item.TTL == "" {
+ err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
+ if err != nil {
+ return err
+ }
+ } else {
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// Delete one or multiple keys.
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("redis_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+ return d.universalClient.Del(context.Background(), keys...).Err()
+}
+
+// MExpire https://redis.io/commands/expire
+// timeout in RFC3339
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_mexpire")
+ now := time.Now()
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+
+ // t guessed to be in future
+ // for Redis we use t.Sub, it will result in seconds, like 4.2s
+ d.universalClient.Expire(context.Background(), item.Key, t.Sub(now))
+ }
+
+ return nil
+}
+
+// TTL https://redis.io/commands/ttl
+// return time in seconds (float64) for a given keys
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ duration, err := d.universalClient.TTL(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = duration.Seconds()
+ }
+ return m, nil
+}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
new file mode 100644
index 00000000..d2183411
--- /dev/null
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -0,0 +1,51 @@
+package redis
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "redis"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("redis_plugin_provide")
+ st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index 42e0ef3e..9f460073 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -135,19 +135,20 @@ func (p *Plugin) Serve() chan error {
// save the storage
p.storages[k] = storage
-
case redis:
- // if _, ok := p.drivers[redis]; !ok {
- // continue
- // }
- // storage, err := p.drivers[redis].Configure(configKey)
- // if err != nil {
- // errCh <- errors.E(op, err)
- // return errCh
- // }
- //
- // // save the storage
- // p.storages[k] = storage
+ if _, ok := p.drivers[redis]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[redis].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
default:
errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))
}