From 75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 14 Jun 2021 16:39:02 +0300 Subject: - Rework redis with ws plugins Signed-off-by: Valery Piashchynski --- plugins/kv/drivers/boltdb/plugin.go | 2 +- plugins/kv/drivers/memcached/plugin.go | 2 +- plugins/kv/drivers/memory/plugin.go | 2 +- plugins/kv/drivers/redis/config.go | 34 ----- plugins/kv/drivers/redis/driver.go | 242 --------------------------------- plugins/kv/drivers/redis/plugin.go | 51 ------- plugins/kv/interface.go | 2 +- plugins/kv/plugin.go | 208 ++++++++++++++++++++++++++++ plugins/kv/storage.go | 190 -------------------------- 9 files changed, 212 insertions(+), 521 deletions(-) delete mode 100644 plugins/kv/drivers/redis/config.go delete mode 100644 plugins/kv/drivers/redis/driver.go delete mode 100644 plugins/kv/drivers/redis/plugin.go create mode 100644 plugins/kv/plugin.go delete mode 100644 plugins/kv/storage.go (limited to 'plugins/kv') diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 9b4cf9f7..28e2a89c 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -46,7 +46,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 3997e0d4..936b2047 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -34,7 +34,7 @@ func (s *Plugin) Name() string { // Available interface implementation func (s *Plugin) Available() {} -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go index 2be7caae..da81017e 100644 --- a/plugins/kv/drivers/memory/plugin.go +++ b/plugins/kv/drivers/memory/plugin.go @@ -45,7 +45,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) Provide(key string) (kv.Storage, error) { +func (s *Plugin) KVProvide(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go deleted file mode 100644 index 41348236..00000000 --- a/plugins/kv/drivers/redis/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package redis - -import "time" - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go deleted file mode 100644 index 66cb8384..00000000 --- a/plugins/kv/drivers/redis/driver.go +++ /dev/null @@ -1,242 +0,0 @@ -package redis - -import ( - "context" - "strings" - "time" - - "github.com/go-redis/redis/v8" - "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" -) - -type Driver struct { - universalClient redis.UniversalClient - log logger.Logger - cfg *Config -} - -func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) { - const op = errors.Op("new_boltdb_driver") - - d := &Driver{ - log: log, - } - - // will be different for every connected driver - err := cfgPlugin.UnmarshalKey(key, &d.cfg) - if err != nil { - return nil, errors.E(op, err) - } - - d.cfg.InitDefaults() - d.log = log - - d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ - Addrs: d.cfg.Addrs, - DB: d.cfg.DB, - Username: d.cfg.Username, - Password: d.cfg.Password, - SentinelPassword: d.cfg.SentinelPassword, - MaxRetries: d.cfg.MaxRetries, - MinRetryBackoff: d.cfg.MaxRetryBackoff, - MaxRetryBackoff: d.cfg.MaxRetryBackoff, - DialTimeout: d.cfg.DialTimeout, - ReadTimeout: d.cfg.ReadTimeout, - WriteTimeout: d.cfg.WriteTimeout, - PoolSize: d.cfg.PoolSize, - MinIdleConns: d.cfg.MinIdleConns, - MaxConnAge: d.cfg.MaxConnAge, - PoolTimeout: d.cfg.PoolTimeout, - IdleTimeout: d.cfg.IdleTimeout, - IdleCheckFrequency: d.cfg.IdleCheckFreq, - ReadOnly: d.cfg.ReadOnly, - RouteByLatency: d.cfg.RouteByLatency, - RouteRandomly: d.cfg.RouteRandomly, - MasterName: d.cfg.MasterName, - }) - - return d, nil -} - -// Has checks if value exists. -func (d *Driver) Has(keys ...string) (map[string]bool, error) { - const op = errors.Op("redis_driver_has") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - m := make(map[string]bool, len(keys)) - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - - exist, err := d.universalClient.Exists(context.Background(), key).Result() - if err != nil { - return nil, err - } - if exist == 1 { - m[key] = true - } - } - return m, nil -} - -// Get loads key content into slice. -func (d *Driver) Get(key string) ([]byte, error) { - const op = errors.Op("redis_driver_get") - // to get cases like " " - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - return d.universalClient.Get(context.Background(), key).Bytes() -} - -// MGet loads content of multiple values (some values might be skipped). -// https://redis.io/commands/mget -// Returns slice with the interfaces with values -func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { - const op = errors.Op("redis_driver_mget") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string][]byte, len(keys)) - - for _, k := range keys { - cmd := d.universalClient.Get(context.Background(), k) - if cmd.Err() != nil { - if cmd.Err() == redis.Nil { - continue - } - return nil, errors.E(op, cmd.Err()) - } - - m[k] = utils.AsBytes(cmd.Val()) - } - - return m, nil -} - -// Set sets value with the TTL in seconds -// https://redis.io/commands/set -// Redis `SET key value [expiration]` command. -// -// Use expiration for `SETEX`-like behavior. -// Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...*kvv1.Item) error { - const op = errors.Op("redis_driver_set") - if items == nil { - return errors.E(op, errors.NoKeys) - } - now := time.Now() - for _, item := range items { - if item == nil { - return errors.E(op, errors.EmptyKey) - } - - if item.Timeout == "" { - err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() - if err != nil { - return err - } - } else { - t, err := time.Parse(time.RFC3339, item.Timeout) - if err != nil { - return err - } - err = d.universalClient.Set(context.Background(), item.Key, item.Value, t.Sub(now)).Err() - if err != nil { - return err - } - } - } - return nil -} - -// Delete one or multiple keys. -func (d *Driver) Delete(keys ...string) error { - const op = errors.Op("redis_driver_delete") - if keys == nil { - return errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return errors.E(op, errors.EmptyKey) - } - } - return d.universalClient.Del(context.Background(), keys...).Err() -} - -// MExpire https://redis.io/commands/expire -// timeout in RFC3339 -func (d *Driver) MExpire(items ...*kvv1.Item) error { - const op = errors.Op("redis_driver_mexpire") - now := time.Now() - for _, item := range items { - if item == nil { - continue - } - if item.Timeout == "" || strings.TrimSpace(item.Key) == "" { - return errors.E(op, errors.Str("should set timeout and at least one key")) - } - - t, err := time.Parse(time.RFC3339, item.Timeout) - if err != nil { - return err - } - - // t guessed to be in future - // for Redis we use t.Sub, it will result in seconds, like 4.2s - d.universalClient.Expire(context.Background(), item.Key, t.Sub(now)) - } - - return nil -} - -// TTL https://redis.io/commands/ttl -// return time in seconds (float64) for a given keys -func (d *Driver) TTL(keys ...string) (map[string]string, error) { - const op = errors.Op("redis_driver_ttl") - if keys == nil { - return nil, errors.E(op, errors.NoKeys) - } - - // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) - if keyTrimmed == "" { - return nil, errors.E(op, errors.EmptyKey) - } - } - - m := make(map[string]string, len(keys)) - - for _, key := range keys { - duration, err := d.universalClient.TTL(context.Background(), key).Result() - if err != nil { - return nil, err - } - - m[key] = duration.String() - } - return m, nil -} diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go deleted file mode 100644 index 3694c5a7..00000000 --- a/plugins/kv/drivers/redis/plugin.go +++ /dev/null @@ -1,51 +0,0 @@ -package redis - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const PluginName = "redis" - -// Plugin BoltDB K/V storage. -type Plugin struct { - cfgPlugin config.Configurer - // logger - log logger.Logger -} - -func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { - if !cfg.Has(kv.PluginName) { - return errors.E(errors.Disabled) - } - - s.log = log - s.cfgPlugin = cfg - return nil -} - -// Serve is noop here -func (s *Plugin) Serve() chan error { - return make(chan error) -} - -func (s *Plugin) Stop() error { - return nil -} - -func (s *Plugin) Provide(key string) (kv.Storage, error) { - const op = errors.Op("redis_plugin_provide") - st, err := NewRedisDriver(s.log, key, s.cfgPlugin) - if err != nil { - return nil, errors.E(op, err) - } - - return st, nil -} - -// Name returns plugin name -func (s *Plugin) Name() string { - return PluginName -} diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 744c6b51..5aedd5c3 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -37,5 +37,5 @@ type StorageDriver interface { // Provider provides storage based on the config type Provider interface { // Provide provides Storage based on the config key - Provide(key string) (Storage, error) + KVProvide(key string) (Storage, error) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go new file mode 100644 index 00000000..efe92252 --- /dev/null +++ b/plugins/kv/plugin.go @@ -0,0 +1,208 @@ +package kv + +import ( + "fmt" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName string = "kv" + +const ( + // driver is the mandatory field which should present in every storage + driver string = "driver" + + memcached string = "memcached" + boltdb string = "boltdb" + redis string = "redis" + memory string = "memory" +) + +// Plugin for the unified storage +type Plugin struct { + log logger.Logger + // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. + drivers map[string]StorageDriver + // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. + storages map[string]Storage + // KV configuration + cfg Config + cfgPlugin config.Configurer +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("kv_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + p.drivers = make(map[string]StorageDriver, 5) + p.storages = make(map[string]Storage, 5) + p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { //nolint:gocognit + errCh := make(chan error, 1) + const op = errors.Op("kv_plugin_serve") + // key - storage name in the config + // value - storage + /* + For example we can have here 2 storages (but they are not pre-configured) + for the boltdb and memcached + We should provide here the actual configs for the all requested storages + kv: + boltdb-south: + driver: boltdb + dir: "tests/rr-bolt" + file: "rr.db" + bucket: "rr" + permissions: 777 + ttl: 40s + + boltdb-north: + driver: boltdb + dir: "tests/rr-bolt" + file: "rr.db" + bucket: "rr" + permissions: 777 + ttl: 40s + + memcached: + driver: memcached + addr: [ "localhost:11211" ] + + + For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + when user requests for example boltdb-south, we should provide that particular preconfigured storage + */ + for k, v := range p.cfg.Data { + if _, ok := v.(map[string]interface{})[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, k) + // at this point we know, that driver field present in the configuration + switch v.(map[string]interface{})[driver] { + case memcached: + if _, ok := p.drivers[memcached]; !ok { + p.log.Warn("no memcached drivers registered", "registered", p.drivers) + continue + } + + storage, err := p.drivers[memcached].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + + case boltdb: + if _, ok := p.drivers[boltdb]; !ok { + p.log.Warn("no boltdb drivers registered", "registered", p.drivers) + continue + } + + storage, err := p.drivers[boltdb].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case memory: + if _, ok := p.drivers[memory]; !ok { + p.log.Warn("no in-memory drivers registered", "registered", p.drivers) + continue + } + + storage, err := p.drivers[memory].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case redis: + if _, ok := p.drivers[redis]; !ok { + p.log.Warn("no redis drivers registered", "registered", p.drivers) + continue + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + storage, err := p.drivers[redis].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + case p.cfgPlugin.Has(redis): + storage, err := p.drivers[redis].KVProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + continue + default: + // otherwise - error, no local or global config + p.log.Warn("no global or local redis configuration provided", "key", configKey) + continue + } + + default: + p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) + } + } + + return errCh +} + +func (p *Plugin) Stop() error { + return nil +} + +// Collects will get all plugins which implement Storage interface +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.GetAllStorageDrivers, + } +} + +func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { + // save the storage driver + p.drivers[name.Name()] = storage +} + +// RPC returns associated rpc service. +func (p *Plugin) RPC() interface{} { + return &rpc{srv: p, log: p.log, storages: p.storages} +} + +func (p *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (p *Plugin) Available() { +} diff --git a/plugins/kv/storage.go b/plugins/kv/storage.go deleted file mode 100644 index 9a609735..00000000 --- a/plugins/kv/storage.go +++ /dev/null @@ -1,190 +0,0 @@ -package kv - -import ( - "fmt" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const PluginName string = "kv" - -const ( - // driver is the mandatory field which should present in every storage - driver string = "driver" - - memcached string = "memcached" - boltdb string = "boltdb" - redis string = "redis" - memory string = "memory" -) - -// Plugin for the unified storage -type Plugin struct { - log logger.Logger - // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. - drivers map[string]StorageDriver - // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. - storages map[string]Storage - // KV configuration - cfg Config - cfgPlugin config.Configurer -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("kv_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) - if err != nil { - return errors.E(op, err) - } - p.drivers = make(map[string]StorageDriver, 5) - p.storages = make(map[string]Storage, 5) - p.log = log - p.cfgPlugin = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - errCh := make(chan error, 1) - const op = errors.Op("kv_plugin_serve") - // key - storage name in the config - // value - storage - /* - For example we can have here 2 storages (but they are not pre-configured) - for the boltdb and memcached - We should provide here the actual configs for the all requested storages - kv: - boltdb-south: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - boltdb-north: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - memcached: - driver: memcached - addr: [ "localhost:11211" ] - - - For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - when user requests for example boltdb-south, we should provide that particular preconfigured storage - */ - for k, v := range p.cfg.Data { - if _, ok := v.(map[string]interface{})[driver]; !ok { - errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) - return errCh - } - - // config key for the particular sub-driver kv.memcached - configKey := fmt.Sprintf("%s.%s", PluginName, k) - // at this point we know, that driver field present in the configuration - switch v.(map[string]interface{})[driver] { - case memcached: - if _, ok := p.drivers[memcached]; !ok { - p.log.Warn("no memcached drivers registered", "registered", p.drivers) - continue - } - - storage, err := p.drivers[memcached].Provide(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - - case boltdb: - if _, ok := p.drivers[boltdb]; !ok { - p.log.Warn("no boltdb drivers registered", "registered", p.drivers) - continue - } - - storage, err := p.drivers[boltdb].Provide(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case memory: - if _, ok := p.drivers[memory]; !ok { - p.log.Warn("no in-memory drivers registered", "registered", p.drivers) - continue - } - - storage, err := p.drivers[memory].Provide(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case redis: - if _, ok := p.drivers[redis]; !ok { - p.log.Warn("no redis drivers registered", "registered", p.drivers) - continue - } - - storage, err := p.drivers[redis].Provide(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - - default: - p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) - } - } - - return errCh -} - -func (p *Plugin) Stop() error { - return nil -} - -// Collects will get all plugins which implement Storage interface -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.GetAllStorageDrivers, - } -} - -func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { - // save the storage driver - p.drivers[name.Name()] = storage -} - -// RPC returns associated rpc service. -func (p *Plugin) RPC() interface{} { - return &rpc{srv: p, log: p.log, storages: p.storages} -} - -func (p *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (p *Plugin) Available() { -} -- cgit v1.2.3