diff options
author | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
commit | efb3efa98c8555815330274f0618bfc080f4c65c (patch) | |
tree | b3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins/memcached | |
parent | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff) |
Move drivers to the plugin's root.
Fix #771, add tests.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memcached')
-rw-r--r-- | plugins/memcached/config.go | 12 | ||||
-rw-r--r-- | plugins/memcached/driver.go | 248 | ||||
-rw-r--r-- | plugins/memcached/plugin.go | 48 |
3 files changed, 308 insertions, 0 deletions
diff --git a/plugins/memcached/config.go b/plugins/memcached/config.go new file mode 100644 index 00000000..6d413790 --- /dev/null +++ b/plugins/memcached/config.go @@ -0,0 +1,12 @@ +package memcached + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + if s.Addr == nil { + s.Addr = []string{"127.0.0.1:11211"} // default url for memcached + } +} diff --git a/plugins/memcached/driver.go b/plugins/memcached/driver.go new file mode 100644 index 00000000..e24747fe --- /dev/null +++ b/plugins/memcached/driver.go @@ -0,0 +1,248 @@ +package memcached + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" +) + +type Driver struct { + client *memcache.Client + log logger.Logger + cfg *Config +} + +// NewMemcachedDriver returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { + const op = errors.Op("new_memcached_driver") + + s := &Driver{ + log: log, + } + + err := cfgPlugin.UnmarshalKey(key, &s.cfg) + if err != nil { + return nil, errors.E(op, err) + } + + s.cfg.InitDefaults() + + m := memcache.New(s.cfg.Addr...) + s.client = m + + return s, nil +} + +// Has checks the key for existence +func (d *Driver) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := d.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (d *Driver) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := d.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// MGet return map with key -- string +// and map value as value -- []byte +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string][]byte, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := d.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) Set(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == nil { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: items[i].Value, + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].Timeout != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := d.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// MExpire Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (d *Driver) MExpire(items ...*kvv1.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].Timeout) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = d.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// TTL return time in seconds (int32) for a given keys +func (d *Driver) TTL(_ ...string) (map[string]string, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (d *Driver) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := d.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} + +func (d *Driver) Clear() error { + err := d.client.DeleteAll() + if err != nil { + d.log.Error("flush_all operation failed", "error", err) + return err + } + + return nil +} diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go new file mode 100644 index 00000000..59a2b7cb --- /dev/null +++ b/plugins/memcached/plugin.go @@ -0,0 +1,48 @@ +package memcached + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/common/kv" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "memcached" + RootPluginName string = "kv" +) + +type Plugin struct { + // config plugin + cfgPlugin config.Configurer + // logger + log logger.Logger +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + if !cfg.Has(RootPluginName) { + return errors.E(errors.Disabled) + } + + s.cfgPlugin = cfg + s.log = log + return nil +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +// Available interface implementation +func (s *Plugin) Available() {} + +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { + const op = errors.Op("boltdb_plugin_provide") + st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) + if err != nil { + return nil, errors.E(op, err) + } + + return st, nil +} |