summaryrefslogtreecommitdiff
path: root/plugins/kv/drivers/memcached
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-22 22:03:59 +0300
committerValery Piashchynski <[email protected]>2021-04-22 22:03:59 +0300
commit91c1fa2e2693cb662425c1ba7cca2325a8458995 (patch)
treed97791a6675678f607396af5de81143e764ca108 /plugins/kv/drivers/memcached
parent1e62c2afa4fe8b5bae0c26e72ae61ef6b5f0f54d (diff)
- Rework storage drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv/drivers/memcached')
-rw-r--r--plugins/kv/drivers/memcached/driver.go235
-rw-r--r--plugins/kv/drivers/memcached/plugin.go228
-rw-r--r--plugins/kv/drivers/memcached/plugin_unit_test.go432
3 files changed, 240 insertions, 655 deletions
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
new file mode 100644
index 00000000..17b06fa0
--- /dev/null
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -0,0 +1,235 @@
+package memcached
+
+import (
+ "strings"
+ "time"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type Driver struct {
+ client *memcache.Client
+ log logger.Logger
+ cfg *Config
+}
+
+// NewMemcachedDriver returns a memcache client using the provided server(s)
+// with equal weight. If a server is listed multiple times,
+// it gets a proportional amount of weight.
+func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (kv.Storage, error) {
+ const op = errors.Op("new_memcached_driver")
+
+ s := &Driver{
+ log: log,
+ }
+
+ err := cfgPlugin.UnmarshalKey(key, &s.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ s.cfg.InitDefaults()
+
+ m := memcache.New(s.cfg.Addr...)
+ s.client = m
+
+ return s, nil
+}
+
+// Has checks the key for existence
+func (d *Driver) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool, len(keys))
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ exist, err := d.client.Get(keys[i])
+
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if exist != nil {
+ m[keys[i]] = true
+ }
+ }
+ return m, nil
+}
+
+// Get gets the item for the given key. ErrCacheMiss is returned for a
+// memcache cache miss. The key must be at most 250 bytes in length.
+func (d *Driver) Get(key string) ([]byte, error) {
+ const op = errors.Op("memcached_plugin_get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ data, err := d.client.Get(key)
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ return nil, nil
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ // return the value by the key
+ return data.Value, nil
+ }
+ // data is nil by some reason and error also nil
+ return nil, nil
+}
+
+// MGet return map with key -- string
+// and map value as value -- []byte
+func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached_plugin_mget")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+ for i := range keys {
+ // Here also MultiGet
+ data, err := d.client.Get(keys[i])
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return nil, errors.E(op, err)
+ }
+ if data != nil {
+ m[keys[i]] = data.Value
+ }
+ }
+
+ return m, nil
+}
+
+// Set sets the KV pairs. Keys should be 250 bytes maximum
+// TTL:
+// Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) Set(items ...kv.Item) error {
+ const op = errors.Op("memcached_plugin_set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ if items[i] == EmptyItem {
+ return errors.E(op, errors.EmptyItem)
+ }
+
+ // pre-allocate item
+ memcachedItem := &memcache.Item{
+ Key: items[i].Key,
+ // unsafe convert
+ Value: []byte(items[i].Value),
+ Flags: 0,
+ }
+
+ // add additional TTL in case of TTL isn't empty
+ if items[i].TTL != "" {
+ // verify the TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ memcachedItem.Expiration = int32(t.Unix())
+ }
+
+ err := d.client.Set(memcachedItem)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// MExpire Expiration is the cache expiration time, in seconds: either a relative
+// time from now (up to 1 month), or an absolute Unix epoch time.
+// Zero means the Item has no expiration time.
+func (d *Driver) MExpire(items ...kv.Item) error {
+ const op = errors.Op("memcached_plugin_mexpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // verify provided TTL
+ t, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Touch updates the expiry for the given key. The seconds parameter is either
+ // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
+ // into the future at which time the item will expire. Zero means the item has
+ // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
+ // The key must be at most 250 bytes in length.
+ err = d.client.Touch(items[i].Key, int32(t.Unix()))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
+
+// TTL return time in seconds (int32) for a given keys
+func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) {
+ const op = errors.Op("memcached_plugin_ttl")
+ return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
+}
+
+func (d *Driver) Delete(keys ...string) error {
+ const op = errors.Op("memcached_plugin_has")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ err := d.client.Delete(keys[i])
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err != nil {
+ // ErrCacheMiss means that a Get failed because the item wasn't present.
+ if err == memcache.ErrCacheMiss {
+ continue
+ }
+ return errors.E(op, err)
+ }
+ }
+ return nil
+}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index dff47cb8..af59e91b 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -1,10 +1,6 @@
package memcached
import (
- "strings"
- "time"
-
- "github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
@@ -16,23 +12,10 @@ const PluginName = "memcached"
var EmptyItem = kv.Item{}
type Plugin struct {
- // config
- cfg *Config
+ // config plugin
cfgPlugin config.Configurer
// logger
log logger.Logger
- // memcached client
- client *memcache.Client
-}
-
-// NewMemcachedClient returns a memcache client using the provided server(s)
-// with equal weight. If a server is listed multiple times,
-// it gets a proportional amount of weight.
-func NewMemcachedClient(url ...string) kv.Storage {
- m := memcache.New(url...)
- return &Plugin{
- client: m,
- }
}
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
@@ -50,213 +33,12 @@ func (s *Plugin) Name() string {
return PluginName
}
-func (s *Plugin) Configure(key string) (kv.Storage, error) {
- const op = errors.Op("memcached_plugin_configure")
- err := s.cfgPlugin.UnmarshalKey(key, &s.cfg)
+func (s *Plugin) Provide(key string) (kv.Storage, error) {
+ const op = errors.Op("boltdb_plugin_provide")
+ st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
return nil, errors.E(op, err)
}
- // initialize default keys
- s.cfg.InitDefaults()
-
- return NewMemcachedClient(s.cfg.Addr...), nil
-}
-
-// Has checks the key for existence
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("memcached_plugin_has")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
- m := make(map[string]bool, len(keys))
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- exist, err := s.client.Get(keys[i])
-
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return nil, errors.E(op, err)
- }
- if exist != nil {
- m[keys[i]] = true
- }
- }
- return m, nil
-}
-
-// Get gets the item for the given key. ErrCacheMiss is returned for a
-// memcache cache miss. The key must be at most 250 bytes in length.
-func (s *Plugin) Get(key string) ([]byte, error) {
- const op = errors.Op("memcached_plugin_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- data, err := s.client.Get(key)
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- return nil, nil
- }
- return nil, errors.E(op, err)
- }
- if data != nil {
- // return the value by the key
- return data.Value, nil
- }
- // data is nil by some reason and error also nil
- return nil, nil
-}
-
-// MGet return map with key -- string
-// and map value as value -- []byte
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("memcached_plugin_mget")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]interface{}, len(keys))
- for i := range keys {
- // Here also MultiGet
- data, err := s.client.Get(keys[i])
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return nil, errors.E(op, err)
- }
- if data != nil {
- m[keys[i]] = data.Value
- }
- }
-
- return m, nil
-}
-
-// Set sets the KV pairs. Keys should be 250 bytes maximum
-// TTL:
-// Expiration is the cache expiration time, in seconds: either a relative
-// time from now (up to 1 month), or an absolute Unix epoch time.
-// Zero means the Item has no expiration time.
-func (s *Plugin) Set(items ...kv.Item) error {
- const op = errors.Op("memcached_plugin_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- for i := range items {
- if items[i] == EmptyItem {
- return errors.E(op, errors.EmptyItem)
- }
-
- // pre-allocate item
- memcachedItem := &memcache.Item{
- Key: items[i].Key,
- // unsafe convert
- Value: []byte(items[i].Value),
- Flags: 0,
- }
-
- // add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
- // verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return err
- }
- memcachedItem.Expiration = int32(t.Unix())
- }
-
- err := s.client.Set(memcachedItem)
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// MExpire Expiration is the cache expiration time, in seconds: either a relative
-// time from now (up to 1 month), or an absolute Unix epoch time.
-// Zero means the Item has no expiration time.
-func (s *Plugin) MExpire(items ...kv.Item) error {
- const op = errors.Op("memcached_plugin_mexpire")
- for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- // verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return errors.E(op, err)
- }
-
- // Touch updates the expiry for the given key. The seconds parameter is either
- // a Unix timestamp or, if seconds is less than 1 month, the number of seconds
- // into the future at which time the item will expire. Zero means the item has
- // no expiration time. ErrCacheMiss is returned if the key is not in the cache.
- // The key must be at most 250 bytes in length.
- err = s.client.Touch(items[i].Key, int32(t.Unix()))
- if err != nil {
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-// TTL return time in seconds (int32) for a given keys
-func (s *Plugin) TTL(_ ...string) (map[string]interface{}, error) {
- const op = errors.Op("memcached_plugin_ttl")
- return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
-}
-
-func (s *Plugin) Delete(keys ...string) error {
- const op = errors.Op("memcached_plugin_has")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
-
- for i := range keys {
- err := s.client.Delete(keys[i])
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err != nil {
- // ErrCacheMiss means that a Get failed because the item wasn't present.
- if err == memcache.ErrCacheMiss {
- continue
- }
- return errors.E(op, err)
- }
- }
- return nil
-}
-
-func (s *Plugin) Close() error {
- return nil
+ return st, nil
}
diff --git a/plugins/kv/drivers/memcached/plugin_unit_test.go b/plugins/kv/drivers/memcached/plugin_unit_test.go
deleted file mode 100644
index 31423627..00000000
--- a/plugins/kv/drivers/memcached/plugin_unit_test.go
+++ /dev/null
@@ -1,432 +0,0 @@
-package memcached
-
-import (
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/stretchr/testify/assert"
-)
-
-func initStorage() kv.Storage {
- return NewMemcachedClient("localhost:11211")
-}
-
-func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(keys...)
- if err != nil {
- t.Fatalf("error during cleanup: %s", err.Error())
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "hello world" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not present
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
-
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.Error(t, s.Set(kv.Item{}))
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}