diff options
author | Valery Piashchynski <[email protected]> | 2021-04-22 00:48:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-22 00:48:35 +0300 |
commit | e4d65a41ec90747a387cfe769f743327959f7105 (patch) | |
tree | 2b5245fa0a86197d4699fc840c658ffa86cd957b /plugins/kv/drivers/memcached | |
parent | e1e168da92e0dca0e067e08ecb4cf264b9344d45 (diff) |
- General interface, update RPC and Has/Set methods
Diffstat (limited to 'plugins/kv/drivers/memcached')
-rw-r--r-- | plugins/kv/drivers/memcached/config.go | 12 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 269 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin_unit_test.go | 432 |
3 files changed, 713 insertions, 0 deletions
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/kv/drivers/memcached/config.go new file mode 100644 index 00000000..7aad53b6 --- /dev/null +++ b/plugins/kv/drivers/memcached/config.go @@ -0,0 +1,12 @@ +package memcached + +type Config struct { + // Addr is url for memcached, 11211 port is used by default + Addr []string +} + +func (s *Config) InitDefaults() { + if s.Addr == nil { + s.Addr = []string{"localhost:11211"} // default url for memcached + } +} diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go new file mode 100644 index 00000000..496042a6 --- /dev/null +++ b/plugins/kv/drivers/memcached/plugin.go @@ -0,0 +1,269 @@ +package memcached + +import ( + "strings" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const PluginName = "memcached" + +var EmptyItem = kv.Item{} + +type Plugin struct { + // config + cfg *Config + // logger + log logger.Logger + // memcached client + client *memcache.Client +} + +// NewMemcachedClient returns a memcache client using the provided server(s) +// with equal weight. If a server is listed multiple times, +// it gets a proportional amount of weight. +func NewMemcachedClient(url string) kv.Storage { + m := memcache.New(url) + return &Plugin{ + client: m, + } +} + +func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { + const op = errors.Op("memcached_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + err := cfg.UnmarshalKey(PluginName, &s.cfg) + if err != nil { + return errors.E(op, err) + } + + s.cfg.InitDefaults() + + s.log = log + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) + s.client = memcache.New(s.cfg.Addr...) + return errCh +} + +// Stop Memcached has no stop/close or smt similar to close the connection +func (s *Plugin) Stop() error { + return nil +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + +func (s *Plugin) Configure(key string) (kv.Storage, error) { + return s, nil +} + +// Has checks the key for existence +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + m := make(map[string]bool, len(keys)) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + exist, err := s.client.Get(keys[i]) + + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if exist != nil { + m[keys[i]] = true + } + } + return m, nil +} + +// Get gets the item for the given key. ErrCacheMiss is returned for a +// memcache cache miss. The key must be at most 250 bytes in length. +func (s *Plugin) Get(key string) ([]byte, error) { + const op = errors.Op("memcached_plugin_get") + // to get cases like " " + keyTrimmed := strings.TrimSpace(key) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + data, err := s.client.Get(key) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + return nil, nil + } + return nil, errors.E(op, err) + } + if data != nil { + // return the value by the key + return data.Value, nil + } + // data is nil by some reason and error also nil + return nil, nil +} + +// return map with key -- string +// and map value as value -- []byte +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached_plugin_mget") + if keys == nil { + return nil, errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return nil, errors.E(op, errors.EmptyKey) + } + } + + m := make(map[string]interface{}, len(keys)) + for i := range keys { + // Here also MultiGet + data, err := s.client.Get(keys[i]) + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return nil, errors.E(op, err) + } + if data != nil { + m[keys[i]] = data.Value + } + } + + return m, nil +} + +// Set sets the KV pairs. Keys should be 250 bytes maximum +// TTL: +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (s *Plugin) Set(items ...kv.Item) error { + const op = errors.Op("memcached_plugin_set") + if items == nil { + return errors.E(op, errors.NoKeys) + } + + for i := range items { + if items[i] == EmptyItem { + return errors.E(op, errors.EmptyItem) + } + + // pre-allocate item + memcachedItem := &memcache.Item{ + Key: items[i].Key, + // unsafe convert + Value: []byte(items[i].Value), + Flags: 0, + } + + // add additional TTL in case of TTL isn't empty + if items[i].TTL != "" { + // verify the TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return err + } + memcachedItem.Expiration = int32(t.Unix()) + } + + err := s.client.Set(memcachedItem) + if err != nil { + return err + } + } + + return nil +} + +// Expiration is the cache expiration time, in seconds: either a relative +// time from now (up to 1 month), or an absolute Unix epoch time. +// Zero means the Item has no expiration time. +func (s *Plugin) MExpire(items ...kv.Item) error { + const op = errors.Op("memcached_plugin_mexpire") + for i := range items { + if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + return errors.E(op, errors.Str("should set timeout and at least one key")) + } + + // verify provided TTL + t, err := time.Parse(time.RFC3339, items[i].TTL) + if err != nil { + return errors.E(op, err) + } + + // Touch updates the expiry for the given key. The seconds parameter is either + // a Unix timestamp or, if seconds is less than 1 month, the number of seconds + // into the future at which time the item will expire. Zero means the item has + // no expiration time. ErrCacheMiss is returned if the key is not in the cache. + // The key must be at most 250 bytes in length. + err = s.client.Touch(items[i].Key, int32(t.Unix())) + if err != nil { + return errors.E(op, err) + } + } + return nil +} + +// return time in seconds (int32) for a given keys +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { + const op = errors.Op("memcached_plugin_ttl") + return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) +} + +func (s *Plugin) Delete(keys ...string) error { + const op = errors.Op("memcached_plugin_has") + if keys == nil { + return errors.E(op, errors.NoKeys) + } + + // should not be empty keys + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) + if keyTrimmed == "" { + return errors.E(op, errors.EmptyKey) + } + } + + for i := range keys { + err := s.client.Delete(keys[i]) + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err != nil { + // ErrCacheMiss means that a Get failed because the item wasn't present. + if err == memcache.ErrCacheMiss { + continue + } + return errors.E(op, err) + } + } + return nil +} + +func (s *Plugin) Close() error { + return nil +} diff --git a/plugins/kv/drivers/memcached/plugin_unit_test.go b/plugins/kv/drivers/memcached/plugin_unit_test.go new file mode 100644 index 00000000..31423627 --- /dev/null +++ b/plugins/kv/drivers/memcached/plugin_unit_test.go @@ -0,0 +1,432 @@ +package memcached + +import ( + "strconv" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/stretchr/testify/assert" +) + +func initStorage() kv.Storage { + return NewMemcachedClient("localhost:11211") +} + +func cleanup(t *testing.T, s kv.Storage, keys ...string) { + err := s.Delete(keys...) + if err != nil { + t.Fatalf("error during cleanup: %s", err.Error()) + } +} + +func TestStorage_Has(t *testing.T) { + s := initStorage() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) +} + +func TestStorage_Has_Set_Has(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) +} + +func TestStorage_Has_Set_MGet(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_Has_Set_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + res, err := s.Get("key") + assert.NoError(t, err) + + if string(res) != "hello world" { + t.Fatal("wrong value by key") + } +} + +func TestStorage_Set_Del_Get(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + panic(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + // check that keys are present + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not present + res, err = s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 0) +} + +func TestStorage_Set_GetM(t *testing.T) { + s := initStorage() + + defer func() { + cleanup(t, s, "key", "key2") + + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + res, err := s.MGet("key", "key2") + assert.NoError(t, err) + assert.Len(t, res, 2) +} + +func TestStorage_MExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + // set timeout to 5 sec + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + i1 := kv.Item{ + Key: "key", + Value: "", + TTL: nowPlusFive, + } + i2 := kv.Item{ + Key: "key2", + Value: "", + TTL: nowPlusFive, + } + assert.NoError(t, s.MExpire(i1, i2)) + + time.Sleep(time.Second * 7) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestNilAndWrongArgs(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key") + if err := s.Close(); err != nil { + panic(err) + } + }() + + // check + v, err := s.Has("key") + assert.NoError(t, err) + assert.False(t, v["key"]) + + _, err = s.Has("") + assert.Error(t, err) + + _, err = s.Get("") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.Get(" ") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", "") + assert.Error(t, err) + + _, err = s.MGet("key", "key2", " ") + assert.Error(t, err) + + assert.Error(t, s.Set(kv.Item{})) + + err = s.Delete("") + assert.Error(t, err) + + err = s.Delete("key", "") + assert.Error(t, err) + + err = s.Delete("key", " ") + assert.Error(t, err) + + err = s.Delete("key") + assert.NoError(t, err) +} + +func TestStorage_SetExpire_TTL(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + // ensure that storage is clean + v, err := s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, + kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) + + // set timeout to 5 sec + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "value", + TTL: nowPlusFive, + }, + kv.Item{ + Key: "key2", + Value: "value", + TTL: nowPlusFive, + })) + + time.Sleep(time.Second * 7) + + // ensure that storage is clean + v, err = s.Has("key", "key2") + assert.NoError(t, err) + assert.False(t, v["key"]) + assert.False(t, v["key2"]) +} + +func TestConcurrentReadWriteTransactions(t *testing.T) { + s := initStorage() + defer func() { + cleanup(t, s, "key", "key2") + if err := s.Close(); err != nil { + t.Fatal(err) + } + }() + + v, err := s.Has("key") + assert.NoError(t, err) + // no such key + assert.False(t, v["key"]) + + assert.NoError(t, s.Set(kv.Item{ + Key: "key", + Value: "hello world", + TTL: "", + }, kv.Item{ + Key: "key2", + Value: "hello world", + TTL: "", + })) + + v, err = s.Has("key", "key2") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + assert.True(t, v["key2"]) + + wg := &sync.WaitGroup{} + wg.Add(3) + + m := &sync.RWMutex{} + // concurrently set the keys + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 100; i++ { + m.Lock() + // set is writable transaction + // it should stop readable + assert.NoError(t, s.Set(kv.Item{ + Key: "key" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + }, kv.Item{ + Key: "key2" + strconv.Itoa(i), + Value: "hello world" + strconv.Itoa(i), + TTL: "", + })) + m.Unlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 100; i++ { + m.RLock() + v, err = s.Has("key") + assert.NoError(t, err) + // no such key + assert.True(t, v["key"]) + m.RUnlock() + } + }(s) + + // should be no errors + go func(s kv.Storage) { + defer wg.Done() + for i := 0; i <= 100; i++ { + m.Lock() + err = s.Delete("key" + strconv.Itoa(i)) + assert.NoError(t, err) + m.Unlock() + } + }(s) + + wg.Wait() +} |