summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugins/kv/drivers/boltdb/driver.go14
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/driver.go238
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
-rw-r--r--plugins/kv/storage.go25
-rw-r--r--tests/plugins/kv/configs/.rr-redis.yaml12
-rw-r--r--tests/plugins/kv/storage_plugin_test.go202
7 files changed, 556 insertions, 20 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 8c3bb68f..b596d4c3 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -80,7 +80,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
}
func (d *Driver) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("boltdb_plugin_has")
+ const op = errors.Op("boltdb_driver_has")
d.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -121,7 +121,7 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) {
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (d *Driver) Get(key string) ([]byte, error) {
- const op = errors.Op("boltdb_plugin_get")
+ const op = errors.Op("boltdb_driver_get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
if keyTrimmed == "" {
@@ -161,7 +161,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
}
func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_mget")
+ const op = errors.Op("boltdb_driver_mget")
// defense
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -213,7 +213,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Set puts the K/V to the bolt
func (d *Driver) Set(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_set")
+ const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
@@ -276,7 +276,7 @@ func (d *Driver) Set(items ...kv.Item) error {
// Delete all keys from DB
func (d *Driver) Delete(keys ...string) error {
- const op = errors.Op("boltdb_plugin_delete")
+ const op = errors.Op("boltdb_driver_delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
}
@@ -323,7 +323,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
func (d *Driver) MExpire(items ...kv.Item) error {
- const op = errors.Op("boltdb_plugin_mexpire")
+ const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
@@ -341,7 +341,7 @@ func (d *Driver) MExpire(items ...kv.Item) error {
}
func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("boltdb_plugin_ttl")
+ const op = errors.Op("boltdb_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
new file mode 100644
index 00000000..41348236
--- /dev/null
+++ b/plugins/kv/drivers/redis/config.go
@@ -0,0 +1,34 @@
+package redis
+
+import "time"
+
+type Config struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefaults initializing fill config with default values
+func (s *Config) InitDefaults() {
+ if s.Addrs == nil {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
+ }
+}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
new file mode 100644
index 00000000..3cc35c13
--- /dev/null
+++ b/plugins/kv/drivers/redis/driver.go
@@ -0,0 +1,238 @@
+package redis
+
+import (
+ "context"
+ "strings"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+var EmptyItem = kv.Item{}
+
+type Driver struct {
+ universalClient redis.UniversalClient
+ log logger.Logger
+ cfg *Config
+}
+
+func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_boltdb_driver")
+
+ d := &Driver{
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &d.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ d.cfg.InitDefaults()
+ d.log = log
+
+ d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: d.cfg.Addrs,
+ DB: d.cfg.DB,
+ Username: d.cfg.Username,
+ Password: d.cfg.Password,
+ SentinelPassword: d.cfg.SentinelPassword,
+ MaxRetries: d.cfg.MaxRetries,
+ MinRetryBackoff: d.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: d.cfg.MaxRetryBackoff,
+ DialTimeout: d.cfg.DialTimeout,
+ ReadTimeout: d.cfg.ReadTimeout,
+ WriteTimeout: d.cfg.WriteTimeout,
+ PoolSize: d.cfg.PoolSize,
+ MinIdleConns: d.cfg.MinIdleConns,
+ MaxConnAge: d.cfg.MaxConnAge,
+ PoolTimeout: d.cfg.PoolTimeout,
+ IdleTimeout: d.cfg.IdleTimeout,
+ IdleCheckFrequency: d.cfg.IdleCheckFreq,
+ ReadOnly: d.cfg.ReadOnly,
+ RouteByLatency: d.cfg.RouteByLatency,
+ RouteRandomly: d.cfg.RouteRandomly,
+ MasterName: d.cfg.MasterName,
+ })
+
+ return d, nil
+}
+
+// Has checks if value exists.
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("redis_driver_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ m := make(map[string]bool, len(keys))
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ exist, err := d.universalClient.Exists(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+ if exist == 1 {
+ m[key] = true
+ }
+ }
+ return m, nil
+}
+
+// Get loads key content into slice.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("redis_driver_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ return d.universalClient.Get(context.Background(), key).Bytes()
+}
+
+// MGet loads content of multiple values (some values might be skipped).
+// https://redis.io/commands/mget
+// Returns slice with the interfaces with values
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, k := range keys {
+ cmd := d.universalClient.Get(context.Background(), k)
+ if cmd.Err() != nil {
+ if cmd.Err() == redis.Nil {
+ continue
+ }
+ return nil, errors.E(op, cmd.Err())
+ }
+
+ m[k] = cmd.Val()
+ }
+
+ return m, nil
+}
+
+// Set sets value with the TTL in seconds
+// https://redis.io/commands/set
+// Redis `SET key value [expiration]` command.
+//
+// Use expiration for `SETEX`-like behavior.
+// Zero expiration means the key has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+ now := time.Now()
+ for _, item := range items {
+ if item == EmptyItem {
+ return errors.E(op, errors.EmptyKey)
+ }
+
+ if item.TTL == "" {
+ err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
+ if err != nil {
+ return err
+ }
+ } else {
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+ err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// Delete one or multiple keys.
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("redis_driver_delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+ return d.universalClient.Del(context.Background(), keys...).Err()
+}
+
+// MExpire https://redis.io/commands/expire
+// timeout in RFC3339
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("redis_driver_mexpire")
+ now := time.Now()
+ for _, item := range items {
+ if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ t, err := time.Parse(time.RFC3339, item.TTL)
+ if err != nil {
+ return err
+ }
+
+ // t guessed to be in future
+ // for Redis we use t.Sub, it will result in seconds, like 4.2s
+ d.universalClient.Expire(context.Background(), item.Key, t.Sub(now))
+ }
+
+ return nil
+}
+
+// TTL https://redis.io/commands/ttl
+// return time in seconds (float64) for a given keys
+func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("redis_driver_ttl")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for _, key := range keys {
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for _, key := range keys {
+ duration, err := d.universalClient.TTL(context.Background(), key).Result()
+ if err != nil {
+ return nil, err
+ }
+
+ m[key] = duration.Seconds()
+ }
+ return m, nil
+}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
new file mode 100644
index 00000000..d2183411
--- /dev/null
+++ b/plugins/kv/drivers/redis/plugin.go
@@ -0,0 +1,51 @@
+package redis
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName = "redis"
+
+// Plugin BoltDB K/V storage.
+type Plugin struct {
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(kv.PluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.log = log
+ s.cfgPlugin = cfg
+ return nil
+}
+
+// Serve is noop here
+func (s *Plugin) Serve() chan error {
+ return make(chan error, 1)
+}
+
+func (s *Plugin) Stop() error {
+ return nil
+}
+
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("redis_plugin_provide")
+ st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go
index 42e0ef3e..9f460073 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/storage.go
@@ -135,19 +135,20 @@ func (p *Plugin) Serve() chan error {
// save the storage
p.storages[k] = storage
-
case redis:
- // if _, ok := p.drivers[redis]; !ok {
- // continue
- // }
- // storage, err := p.drivers[redis].Configure(configKey)
- // if err != nil {
- // errCh <- errors.E(op, err)
- // return errCh
- // }
- //
- // // save the storage
- // p.storages[k] = storage
+ if _, ok := p.drivers[redis]; !ok {
+ continue
+ }
+
+ storage, err := p.drivers[redis].Provide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+
default:
errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))
}
diff --git a/tests/plugins/kv/configs/.rr-redis.yaml b/tests/plugins/kv/configs/.rr-redis.yaml
new file mode 100644
index 00000000..0a7396ca
--- /dev/null
+++ b/tests/plugins/kv/configs/.rr-redis.yaml
@@ -0,0 +1,12 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+logs:
+ mode: development
+ level: error
+
+kv:
+ redis-rr:
+ driver: redis
+ addrs:
+ - 'localhost:6379'
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 0b42e19d..0c322441 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -18,9 +18,9 @@ import (
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
+ "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis"
"github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/stretchr/testify/assert"
)
@@ -789,3 +789,203 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, ret, 0)
}
+
+func TestRedis(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-redis.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &kv.Plugin{},
+ &redis.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("testRedisRPCMethods", testRPCMethodsRedis)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func testRPCMethodsRedis(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ // add 5 second ttl
+ tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+ keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
+ })
+ data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ TTL: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
+ })
+
+ var setRes bool
+
+ // Register 3 keys with values
+ err = client.Call("kv.Set", data, &setRes)
+ assert.NoError(t, err)
+ assert.True(t, setRes)
+
+ ret := make(map[string]bool)
+ err = client.Call("kv.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 3) // should be 3
+
+ // key "c" should be deleted
+ time.Sleep(time.Second * 7)
+
+ ret = make(map[string]bool)
+ err = client.Call("kv.Has", keys, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 2) // should be 2
+
+ mGet := make(map[string]interface{})
+ err = client.Call("kv.MGet", keys, &mGet)
+ assert.NoError(t, err)
+ assert.Len(t, mGet, 2) // c is expired
+ assert.Equal(t, "aa", mGet["a"].(string))
+ assert.Equal(t, "bb", mGet["b"].(string))
+
+ tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
+ data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
+ {
+ Key: "a",
+ TTL: tt2,
+ },
+ {
+ Key: "b",
+ TTL: tt2,
+ },
+ {
+ Key: "d",
+ TTL: tt2,
+ },
+ })
+
+ // MEXPIRE
+ var mExpRes bool
+ err = client.Call("kv.MExpire", data2, &mExpRes)
+ assert.NoError(t, err)
+ assert.True(t, mExpRes)
+
+ // TTL
+ keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
+ })
+ ttlRes := make(map[string]interface{})
+ err = client.Call("kv.TTL", keys2, &ttlRes)
+ assert.NoError(t, err)
+ assert.Len(t, ttlRes, 3)
+
+ // HAS AFTER TTL
+ time.Sleep(time.Second * 15)
+ ret = make(map[string]bool)
+ err = client.Call("kv.Has", keys2, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+
+ // DELETE
+ keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
+ {
+ Key: "e",
+ },
+ })
+ var delRet bool
+ err = client.Call("kv.Delete", keysDel, &delRet)
+ assert.NoError(t, err)
+ assert.True(t, delRet)
+
+ // HAS AFTER DELETE
+ ret = make(map[string]bool)
+ err = client.Call("kv.Has", keysDel, &ret)
+ assert.NoError(t, err)
+ assert.Len(t, ret, 0)
+}