summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins/kv
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/memcached/config.go12
-rw-r--r--plugins/kv/drivers/memcached/driver.go248
-rw-r--r--plugins/kv/drivers/memcached/plugin.go48
-rw-r--r--plugins/kv/plugin.go116
4 files changed, 12 insertions, 412 deletions
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go
deleted file mode 100644
index 6d413790..00000000
--- a/plugins/kv/drivers/memcached/config.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package memcached
-
-type Config struct {
- // Addr is url for memcached, 11211 port is used by default
- Addr []string
-}
-
-func (s *Config) InitDefaults() {
- if s.Addr == nil {
- s.Addr = []string{"127.0.0.1:11211"} // default url for memcached
- }
-}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
deleted file mode 100644
index e24747fe..00000000
--- a/plugins/kv/drivers/memcached/driver.go
+++ /dev/null
@@ -1,248 +0,0 @@
-package memcached
-
-import (
- "strings"
- "time"
-
- "github.com/bradfitz/gomemcache/memcache"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
-)
-
-type Driver struct {
- client *memcache.Client
- log logger.Logger
- cfg *Config
-}
-
-// NewMemcachedDriver returns a memcache client using the provided server(s)
-// with equal weight. If a server is listed multiple times,
-// it gets a proportional amount of weight.
-func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
- const op = errors.Op("new_memcached_driver")
-
- s := &Driver{
- log: log,
- }
-
- err := cfgPlugin.UnmarshalKey(key, &s.cfg)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- s.cfg.InitDefaults()
-
- m := memcache.New(s.cfg.Addr...)
- s.client = m
-
- return s, nil
-}
-
-// Has checks the key for existence
-func (d *Driver) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("memcached_plugin_has")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
- m := make(map[string]bool, len(keys))
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- exist, err := d.client.Get(keys[i])
-
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return nil, errors.E(op, err)
- }
- if exist != nil {
- m[keys[i]] = true
- }
- }
- return m, nil
-}
-
-// Get gets the item for the given key. ErrCacheMiss is returned for a
-// memcache cache miss. The key must be at most 250 bytes in length.
-func (d *Driver) Get(key string) ([]byte, error) {
- const op = errors.Op("memcached_plugin_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- data, err := d.client.Get(key)
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- return nil, nil
- }
- return nil, errors.E(op, err)
- }
- if data != nil {
- // return the value by the key
- return data.Value, nil
- }
- // data is nil by some reason and error also nil
- return nil, nil
-}
-
-// MGet return map with key -- string
-// and map value as value -- []byte
-func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
- const op = errors.Op("memcached_plugin_mget")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string][]byte, len(keys))
- for i := range keys {
- // Here also MultiGet
- data, err := d.client.Get(keys[i])
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return nil, errors.E(op, err)
- }
- if data != nil {
- m[keys[i]] = data.Value
- }
- }
-
- return m, nil
-}
-
-// Set sets the KV pairs. Keys should be 250 bytes maximum
-// TTL:
-// Expiration is the cache expiration time, in seconds: either a relative
-// time from now (up to 1 month), or an absolute Unix epoch time.
-// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...*kvv1.Item) error {
- const op = errors.Op("memcached_plugin_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- for i := range items {
- if items[i] == nil {
- return errors.E(op, errors.EmptyItem)
- }
-
- // pre-allocate item
- memcachedItem := &memcache.Item{
- Key: items[i].Key,
- // unsafe convert
- Value: items[i].Value,
- Flags: 0,
- }
-
- // add additional TTL in case of TTL isn't empty
- if items[i].Timeout != "" {
- // verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].Timeout)
- if err != nil {
- return err
- }
- memcachedItem.Expiration = int32(t.Unix())
- }
-
- err := d.client.Set(memcachedItem)
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// MExpire Expiration is the cache expiration time, in seconds: either a relative
-// time from now (up to 1 month), or an absolute Unix epoch time.
-// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...*kvv1.Item) error {
- const op = errors.Op("memcached_plugin_mexpire")
- for i := range items {
- if items[i] == nil {
- continue
- }
- if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- // verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].Timeout)
- if err != nil {
- return errors.E(op, err)
- }
-
- // Touch updates the expiry for the given key. The seconds parameter is either
- // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
- // into the future at which time the item will expire. Zero means the item has
- // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
- // The key must be at most 250 bytes in length.
- err = d.client.Touch(items[i].Key, int32(t.Unix()))
- if err != nil {
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-// TTL return time in seconds (int32) for a given keys
-func (d *Driver) TTL(_ ...string) (map[string]string, error) {
- const op = errors.Op("memcached_plugin_ttl")
- return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
-}
-
-func (d *Driver) Delete(keys ...string) error {
- const op = errors.Op("memcached_plugin_has")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
-
- for i := range keys {
- err := d.client.Delete(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-func (d *Driver) Clear() error {
- err := d.client.DeleteAll()
- if err != nil {
- d.log.Error("flush_all operation failed", "error", err)
- return err
- }
-
- return nil
-}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
deleted file mode 100644
index 59a2b7cb..00000000
--- a/plugins/kv/drivers/memcached/plugin.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package memcached
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/kv"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "memcached"
- RootPluginName string = "kv"
-)
-
-type Plugin struct {
- // config plugin
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(RootPluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.cfgPlugin = cfg
- s.log = log
- return nil
-}
-
-// Name returns plugin user-friendly name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (s *Plugin) Available() {}
-
-func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("boltdb_plugin_provide")
- st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return st, nil
-}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 9a19f96c..c6ca96c3 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -16,11 +16,6 @@ const PluginName string = "kv"
const (
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- memcached string = "memcached"
- boltdb string = "boltdb"
- redis string = "redis"
- memory string = "memory"
)
// Plugin for the unified storage
@@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error { //nolint:gocognit
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
// value - storage
- /*
- For example we can have here 2 storages (but they are not pre-configured)
- for the boltdb and memcached
- We should provide here the actual configs for the all requested storages
- kv:
- boltdb-south:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- boltdb-north:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- memcached:
- driver: memcached
- addr: [ "127.0.0.1:11211" ]
-
-
- For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
- when user requests for example boltdb-south, we should provide that particular preconfigured storage
- */
+ // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ // when user requests for example boltdb-south, we should provide that particular preconfigured storage
+
for k, v := range p.cfg.Data {
// for example if the key not properly formatted (yaml)
if v == nil {
@@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
- // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs
- switch v.(map[string]interface{})[driver] {
- case memcached:
- if _, ok := p.constructors[memcached]; !ok {
- p.log.Warn("no memcached constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memcached].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
+ drName := v.(map[string]interface{})[driver]
- case boltdb:
- if _, ok := p.constructors[boltdb]; !ok {
- p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
continue
}
- storage, err := p.constructors[boltdb].KVConstruct(configKey)
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memory].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- p.log.Warn("no redis constructors registered", "registered", p.constructors)
- continue
- }
-
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case p.cfgPlugin.Has(redis):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- continue
- default:
- // otherwise - error, no local or global config
- p.log.Warn("no global or local redis configuration provided", "key", configKey)
- continue
- }
-
- default:
- p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
+
+ continue
}
return errCh