diff options
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 14 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/config.go | 34 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 238 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/plugin.go | 51 | ||||
-rw-r--r-- | plugins/kv/storage.go | 25 | ||||
-rw-r--r-- | tests/plugins/kv/configs/.rr-redis.yaml | 12 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 202 |
7 files changed, 556 insertions, 20 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 8c3bb68f..b596d4c3 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -80,7 +80,7 @@ func NewBoltDBDriver(log logger.Logger, key string, cfgPlugin config.Configurer, } func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("boltdb_plugin_has") + const op = errors.Op("boltdb_driver_has") d.log.Debug("boltdb HAS method called", "args", keys) if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -121,7 +121,7 @@ func (d *Driver) Has(keys ...string) (map[string]bool, error) { // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("boltdb_plugin_get") + const op = errors.Op("boltdb_driver_get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) if keyTrimmed == "" { @@ -161,7 +161,7 @@ func (d *Driver) Get(key string) ([]byte, error) { } func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_mget") + const op = errors.Op("boltdb_driver_mget") // defense if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -213,7 +213,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Set puts the K/V to the bolt func (d *Driver) Set(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_set") + const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) } @@ -276,7 +276,7 @@ func (d *Driver) Set(items ...kv.Item) error { // Delete all keys from DB func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("boltdb_plugin_delete") + const op = errors.Op("boltdb_driver_delete") if keys == nil { return errors.E(op, errors.NoKeys) } @@ -323,7 +323,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten func (d *Driver) MExpire(items ...kv.Item) error { - const op = errors.Op("boltdb_plugin_mexpire") + const op = errors.Op("boltdb_driver_mexpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) @@ -341,7 +341,7 @@ func (d *Driver) MExpire(items ...kv.Item) error { } func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { - const op = errors.Op("boltdb_plugin_ttl") + const op = errors.Op("boltdb_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) } diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go new file mode 100644 index 00000000..41348236 --- /dev/null +++ b/plugins/kv/drivers/redis/config.go @@ -0,0 +1,34 @@ +package redis + +import "time" + +type Config struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefaults initializing fill config with default values +func (s *Config) InitDefaults() { + if s.Addrs == nil { + s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage + } +} diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go new file mode 100644 index 00000000..3cc35c13 --- /dev/null +++ b/plugins/kv/drivers/redis/driver.go @@ -0,0 +1,238 @@ +package redis + +import ( + "context" + "strings" + "time" + + "github.com/go-redis/redis/v8" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +var EmptyItem = kv.Item{} + +type Driver struct { + universalClient redis.UniversalClient + log logger.Logger + cfg *Config +} + +func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) { + const op = errors.Op("new_boltdb_driver") + + d := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &d.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + d.cfg.InitDefaults() + d.log = log + + d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ + Addrs: d.cfg.Addrs, + DB: d.cfg.DB, + Username: d.cfg.Username, + Password: d.cfg.Password, + SentinelPassword: d.cfg.SentinelPassword, + MaxRetries: d.cfg.MaxRetries, + MinRetryBackoff: d.cfg.MaxRetryBackoff, + MaxRetryBackoff: d.cfg.MaxRetryBackoff, + DialTimeout: d.cfg.DialTimeout, + ReadTimeout: d.cfg.ReadTimeout, + WriteTimeout: d.cfg.WriteTimeout, + PoolSize: d.cfg.PoolSize, + MinIdleConns: d.cfg.MinIdleConns, + MaxConnAge: d.cfg.MaxConnAge, + PoolTimeout: d.cfg.PoolTimeout, + IdleTimeout: d.cfg.IdleTimeout, + IdleCheckFrequency: d.cfg.IdleCheckFreq, + ReadOnly: d.cfg.ReadOnly, + RouteByLatency: d.cfg.RouteByLatency, + RouteRandomly: d.cfg.RouteRandomly, + MasterName: d.cfg.MasterName, + }) + + return d, nil +} + +// Has checks if value exists. +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("redis_driver_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + m := make(map[string]bool, len(keys)) + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + + exist, err := d.universalClient.Exists(context.Background(), key).Result() + if err != nil { + return nil, err + } + if exist == 1 { + m[key] = true + } + } + return m, nil +} + +// Get loads key content into slice. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("redis_driver_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + return d.universalClient.Get(context.Background(), key).Bytes() +} + +// MGet loads content of multiple values (some values might be skipped). +// https://redis.io/commands/mget +// Returns slice with the interfaces with values +func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("redis_driver_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for _, k := range keys { + cmd := d.universalClient.Get(context.Background(), k) + if cmd.Err() != nil { + if cmd.Err() == redis.Nil { + continue + } + return nil, errors.E(op, cmd.Err()) + } + + m[k] = cmd.Val() + } + + return m, nil +} + +// Set sets value with the TTL in seconds +// https://redis.io/commands/set +// Redis `SET key value [expiration]` command. +// +// Use expiration for `SETEX`-like behavior. +// Zero expiration means the key has no expiration time. +func (d *Driver) Set(items ...kv.Item) error { + const op = errors.Op("redis_driver_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + now := time.Now() + for _, item := range items { + if item == EmptyItem { + return errors.E(op, errors.EmptyKey) + } + + if item.TTL == "" { + err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() + if err != nil { + return err + } + } else { + t, err := time.Parse(time.RFC3339, item.TTL) + if err != nil { + return err + } + err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err() + if err != nil { + return err + } + } + } + return nil +} + +// Delete one or multiple keys. +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("redis_driver_delete") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + return d.universalClient.Del(context.Background(), keys...).Err() +} + +// MExpire https://redis.io/commands/expire +// timeout in RFC3339 +func (d *Driver) MExpire(items ...kv.Item) error { + const op = errors.Op("redis_driver_mexpire") + now := time.Now() + for _, item := range items { + if item.TTL == "" || strings.TrimSpace(item.Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + t, err := time.Parse(time.RFC3339, item.TTL) + if err != nil { + return err + } + + // t guessed to be in future + // for Redis we use t.Sub, it will result in seconds, like 4.2s + d.universalClient.Expire(context.Background(), item.Key, t.Sub(now)) + } + + return nil +} + +// TTL https://redis.io/commands/ttl +// return time in seconds (float64) for a given keys +func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("redis_driver_ttl") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for _, key := range keys { + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + + for _, key := range keys { + duration, err := d.universalClient.TTL(context.Background(), key).Result() + if err != nil { + return nil, err + } + + m[key] = duration.Seconds() + } + return m, nil +} diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go new file mode 100644 index 00000000..d2183411 --- /dev/null +++ b/plugins/kv/drivers/redis/plugin.go @@ -0,0 +1,51 @@ +package redis + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "redis" + +// Plugin BoltDB K/V storage. +type Plugin struct { + cfgPlugin config.Configurer + // logger + log logger.Logger +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(kv.PluginName) { + return errors.E(errors.Disabled) + } + + s.log = log + s.cfgPlugin = cfg + return nil +} + +// Serve is noop here +func (s *Plugin) Serve() chan error { + return make(chan error, 1) +} + +func (s *Plugin) Stop() error { + return nil +} + +func (s *Plugin) Provide(key string) (kv.Storage, error) { + const op = errors.Op("redis_plugin_provide") + st, err := NewRedisDriver(s.log, key, s.cfgPlugin) + if err != nil { + return nil, errors.E(op, err) + } + + return st, nil +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go index 42e0ef3e..9f460073 100644 --- a/plugins/kv/storage.go +++ b/plugins/kv/storage.go @@ -135,19 +135,20 @@ func (p *Plugin) Serve() chan error { // save the storage p.storages[k] = storage - case redis: - // if _, ok := p.drivers[redis]; !ok { - // continue - // } - // storage, err := p.drivers[redis].Configure(configKey) - // if err != nil { - // errCh <- errors.E(op, err) - // return errCh - // } - // - // // save the storage - // p.storages[k] = storage + if _, ok := p.drivers[redis]; !ok { + continue + } + + storage, err := p.drivers[redis].Provide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + default: errCh <- errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])) } diff --git a/tests/plugins/kv/configs/.rr-redis.yaml b/tests/plugins/kv/configs/.rr-redis.yaml new file mode 100644 index 00000000..0a7396ca --- /dev/null +++ b/tests/plugins/kv/configs/.rr-redis.yaml @@ -0,0 +1,12 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +logs: + mode: development + level: error + +kv: + redis-rr: + driver: redis + addrs: + - 'localhost:6379' diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 0b42e19d..0c322441 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -18,9 +18,9 @@ import ( "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" + "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" ) @@ -789,3 +789,203 @@ func testRPCMethodsInMemory(t *testing.T) { assert.NoError(t, err) assert.Len(t, ret, 0) } + +func TestRedis(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-redis.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &kv.Plugin{}, + &redis.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("testRedisRPCMethods", testRPCMethodsRedis) + stopCh <- struct{}{} + wg.Wait() +} + +func testRPCMethodsRedis(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) + keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, + }) + data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + TTL: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, + }) + + var setRes bool + + // Register 3 keys with values + err = client.Call("kv.Set", data, &setRes) + assert.NoError(t, err) + assert.True(t, setRes) + + ret := make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 3) // should be 3 + + // key "c" should be deleted + time.Sleep(time.Second * 7) + + ret = make(map[string]bool) + err = client.Call("kv.Has", keys, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 2) // should be 2 + + mGet := make(map[string]interface{}) + err = client.Call("kv.MGet", keys, &mGet) + assert.NoError(t, err) + assert.Len(t, mGet, 2) // c is expired + assert.Equal(t, "aa", mGet["a"].(string)) + assert.Equal(t, "bb", mGet["b"].(string)) + + tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) + data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ + { + Key: "a", + TTL: tt2, + }, + { + Key: "b", + TTL: tt2, + }, + { + Key: "d", + TTL: tt2, + }, + }) + + // MEXPIRE + var mExpRes bool + err = client.Call("kv.MExpire", data2, &mExpRes) + assert.NoError(t, err) + assert.True(t, mExpRes) + + // TTL + keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, + }) + ttlRes := make(map[string]interface{}) + err = client.Call("kv.TTL", keys2, &ttlRes) + assert.NoError(t, err) + assert.Len(t, ttlRes, 3) + + // HAS AFTER TTL + time.Sleep(time.Second * 15) + ret = make(map[string]bool) + err = client.Call("kv.Has", keys2, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) + + // DELETE + keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ + { + Key: "e", + }, + }) + var delRet bool + err = client.Call("kv.Delete", keysDel, &delRet) + assert.NoError(t, err) + assert.True(t, delRet) + + // HAS AFTER DELETE + ret = make(map[string]bool) + err = client.Call("kv.Has", keysDel, &ret) + assert.NoError(t, err) + assert.Len(t, ret, 0) +} |