summaryrefslogtreecommitdiff
path: root/plugins/memcached
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins/memcached
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/memcached')
-rw-r--r--plugins/memcached/config.go12
-rw-r--r--plugins/memcached/driver.go248
-rw-r--r--plugins/memcached/plugin.go48
3 files changed, 308 insertions, 0 deletions
diff --git a/plugins/memcached/config.go b/plugins/memcached/config.go
new file mode 100644
index 00000000..6d413790
--- /dev/null
+++ b/plugins/memcached/config.go
@@ -0,0 +1,12 @@
+package memcached
+
+type Config struct {
+ // Addr is url for memcached, 11211 port is used by default
+ Addr []string
+}
+
+func (s *Config) InitDefaults() {
+ if s.Addr == nil {
+ s.Addr = []string{"127.0.0.1:11211"} // default url for memcached
+ }
+}
diff --git a/plugins/memcached/driver.go b/plugins/memcached/driver.go
new file mode 100644
index 00000000..e24747fe
--- /dev/null
+++ b/plugins/memcached/driver.go
@@ -0,0 +1,248 @@
+package memcached
+
+import (
+ "strings"
+ "time"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
+)
+
+type Driver struct {
+ client *memcache.Client
+ log logger.Logger
+ cfg *Config
+}
+
+// NewMemcachedDriver returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
+ const op = errors.Op("new_memcached_driver")
+
+ s := &Driver{
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &s.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ s.cfg.InitDefaults()
+
+ m := memcache.New(s.cfg.Addr...)
+ s.client = m
+
+ return s, nil
+}
+
+// Has checks the key for existence
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool, len(keys))
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ exist, err := d.client.Get(keys[i])
+
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return m, nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("memcached_plugin_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ data, err := d.client.Get(key)
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ return nil, nil
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ // return the value by the key
+ return data.Value, nil
+ }
+ // data is nil by some reason and error also nil
+ return nil, nil
+}
+
+// MGet return map with key -- string
+// and map value as value -- []byte
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
+ const op = errors.Op("memcached_plugin_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string][]byte, len(keys))
+ for i := range keys {
+ // Here also MultiGet
+ data, err := d.client.Get(keys[i])
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ m[keys[i]] = data.Value
+ }
+ }
+
+ return m, nil
+}
+
+// Set sets the KV pairs. Keys should be 250 bytes maximum
+// TTL:
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) Set(items ...*kvv1.Item) error {
+ const op = errors.Op("memcached_plugin_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ if items[i] == nil {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // pre-allocate item
+ memcachedItem := &memcache.Item{
+ Key: items[i].Key,
+ // unsafe convert
+ Value: items[i].Value,
+ Flags: 0,
+ }
+
+ // add additional TTL in case of TTL isn't empty
+ if items[i].Timeout != "" {
+ // verify the TTL
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
+ if err != nil {
+ return err
+ }
+ memcachedItem.Expiration = int32(t.Unix())
+ }
+
+ err := d.client.Set(memcachedItem)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// MExpire Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
+ const op = errors.Op("memcached_plugin_mexpire")
+ for i := range items {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Touch updates the expiry for the given key. The seconds parameter is either
+ // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+ // into the future at which time the item will expire. Zero means the item has
+ // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
+ // The key must be at most 250 bytes in length.
+ err = d.client.Touch(items[i].Key, int32(t.Unix()))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
+
+// TTL return time in seconds (int32) for a given keys
+func (d *Driver) TTL(_ ...string) (map[string]string, error) {
+ const op = errors.Op("memcached_plugin_ttl")
+ return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
+}
+
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ err := d.client.Delete(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
+
+func (d *Driver) Clear() error {
+ err := d.client.DeleteAll()
+ if err != nil {
+ d.log.Error("flush_all operation failed", "error", err)
+ return err
+ }
+
+ return nil
+}
diff --git a/plugins/memcached/plugin.go b/plugins/memcached/plugin.go
new file mode 100644
index 00000000..59a2b7cb
--- /dev/null
+++ b/plugins/memcached/plugin.go
@@ -0,0 +1,48 @@
+package memcached
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/common/kv"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "memcached"
+ RootPluginName string = "kv"
+)
+
+type Plugin struct {
+ // config plugin
+ cfgPlugin config.Configurer
+ // logger
+ log logger.Logger
+}
+
+func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
+ if !cfg.Has(RootPluginName) {
+ return errors.E(errors.Disabled)
+ }
+
+ s.cfgPlugin = cfg
+ s.log = log
+ return nil
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// Available interface implementation
+func (s *Plugin) Available() {}
+
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return st, nil
+}