summaryrefslogtreecommitdiff
path: root/plugins/kv/memory
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-22 00:48:35 +0300
committerValery Piashchynski <[email protected]>2021-04-22 00:48:35 +0300
commite4d65a41ec90747a387cfe769f743327959f7105 (patch)
tree2b5245fa0a86197d4699fc840c658ffa86cd957b /plugins/kv/memory
parente1e168da92e0dca0e067e08ecb4cf264b9344d45 (diff)
- General interface, update RPC and Has/Set methods
Diffstat (limited to 'plugins/kv/memory')
-rw-r--r--plugins/kv/memory/config.go14
-rw-r--r--plugins/kv/memory/plugin.go264
-rw-r--r--plugins/kv/memory/plugin_unit_test.go472
3 files changed, 0 insertions, 750 deletions
diff --git a/plugins/kv/memory/config.go b/plugins/kv/memory/config.go
deleted file mode 100644
index e51d09c5..00000000
--- a/plugins/kv/memory/config.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package memory
-
-// Config is default config for the in-memory driver
-type Config struct {
- // Interval for the check
- Interval int
-}
-
-// InitDefaults by default driver is turned off
-func (c *Config) InitDefaults() {
- if c.Interval == 0 {
- c.Interval = 60 // seconds
- }
-}
diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/memory/plugin.go
deleted file mode 100644
index 4201a1c0..00000000
--- a/plugins/kv/memory/plugin.go
+++ /dev/null
@@ -1,264 +0,0 @@
-package memory
-
-import (
- "strings"
- "sync"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// PluginName is user friendly name for the plugin
-const PluginName = "memory"
-
-type Plugin struct {
- // heap is user map for the key-value pairs
- heap sync.Map
- stop chan struct{}
-
- log logger.Logger
- cfg *Config
-}
-
-func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("in_memory_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
- err := cfg.UnmarshalKey(PluginName, &s.cfg)
- if err != nil {
- return errors.E(op, err)
- }
-
- s.cfg.InitDefaults()
- s.log = log
-
- s.stop = make(chan struct{}, 1)
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- // start in-memory gc for kv
- go s.gc()
-
- return errCh
-}
-
-func (s *Plugin) Stop() error {
- const op = errors.Op("in_memory_plugin_stop")
- err := s.Close()
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
- const op = errors.Op("in_memory_plugin_has")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
- m := make(map[string]bool)
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
-
- if _, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = true
- }
- }
-
- return m, nil
-}
-
-func (s *Plugin) Get(key string) ([]byte, error) {
- const op = errors.Op("in_memory_plugin_get")
- // to get cases like " "
- keyTrimmed := strings.TrimSpace(key)
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
-
- if data, exist := s.heap.Load(key); exist {
- // here might be a panic
- // but data only could be a string, see Set function
- return []byte(data.(kv.Item).Value), nil
- }
- return nil, nil
-}
-
-func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("in_memory_plugin_mget")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]interface{}, len(keys))
-
- for i := range keys {
- if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
- }
- }
-
- return m, nil
-}
-
-func (s *Plugin) Set(items ...kv.Item) error {
- const op = errors.Op("in_memory_plugin_set")
- if items == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- for i := range items {
- // TTL is set
- if items[i].TTL != "" {
- // check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return err
- }
- }
-
- s.heap.Store(items[i].Key, items[i])
- }
- return nil
-}
-
-// MExpire sets the expiration time to the key
-// If key already has the expiration time, it will be overwritten
-func (s *Plugin) MExpire(items ...kv.Item) error {
- const op = errors.Op("in_memory_plugin_mexpire")
- for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
- return errors.E(op, errors.Str("should set timeout and at least one key"))
- }
-
- // if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
- // check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
- if err != nil {
- return errors.E(op, err)
- }
- tmp := pItem.(kv.Item)
- // guess that t is in the future
- // in memory is just FOR TESTING PURPOSES
- // LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
- })
- }
- }
-
- return nil
-}
-
-func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
- const op = errors.Op("in_memory_plugin_ttl")
- if keys == nil {
- return nil, errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return nil, errors.E(op, errors.EmptyKey)
- }
- }
-
- m := make(map[string]interface{}, len(keys))
-
- for i := range keys {
- if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
- }
- }
- return m, nil
-}
-
-func (s *Plugin) Delete(keys ...string) error {
- const op = errors.Op("in_memory_plugin_delete")
- if keys == nil {
- return errors.E(op, errors.NoKeys)
- }
-
- // should not be empty keys
- for i := range keys {
- keyTrimmed := strings.TrimSpace(keys[i])
- if keyTrimmed == "" {
- return errors.E(op, errors.EmptyKey)
- }
- }
-
- for i := range keys {
- s.heap.Delete(keys[i])
- }
- return nil
-}
-
-// Close clears the in-memory storage
-func (s *Plugin) Close() error {
- s.stop <- struct{}{}
- return nil
-}
-
-// RPCService returns associated rpc service.
-func (s *Plugin) RPC() interface{} {
- return kv.NewRPCServer(s, s.log)
-}
-
-// Name returns plugin user-friendly name
-func (s *Plugin) Name() string {
- return PluginName
-}
-
-// ================================== PRIVATE ======================================
-
-func (s *Plugin) gc() {
- // TODO check
- ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
- for {
- select {
- case <-s.stop:
- ticker.Stop()
- return
- case now := <-ticker.C:
- // check every second
- s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
- return true
- }
-
- t, err := time.Parse(time.RFC3339, v.TTL)
- if err != nil {
- return false
- }
-
- if now.After(t) {
- s.log.Debug("key deleted", "key", key)
- s.heap.Delete(key)
- }
- return true
- })
- }
- }
-}
diff --git a/plugins/kv/memory/plugin_unit_test.go b/plugins/kv/memory/plugin_unit_test.go
deleted file mode 100644
index 1965a696..00000000
--- a/plugins/kv/memory/plugin_unit_test.go
+++ /dev/null
@@ -1,472 +0,0 @@
-package memory
-
-import (
- "strconv"
- "sync"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/stretchr/testify/assert"
- "go.uber.org/zap"
-)
-
-func initStorage() kv.Storage {
- p := &Plugin{
- stop: make(chan struct{}),
- }
- p.cfg = &Config{
- Interval: 1,
- }
-
- l, _ := zap.NewDevelopment()
- p.log = logger.NewZapAdapter(l)
-
- go p.gc()
-
- return p
-}
-
-func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(keys...)
- if err != nil {
- t.Fatalf("error during cleanup: %s", err.Error())
- }
-}
-
-func TestStorage_Has(t *testing.T) {
- s := initStorage()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-}
-
-func TestStorage_Has_Set_Has(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-}
-
-func TestStorage_Has_Set_MGet(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_Has_Set_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- res, err := s.Get("key")
- assert.NoError(t, err)
-
- if string(res) != "value" {
- t.Fatal("wrong value by key")
- }
-}
-
-func TestStorage_Set_Del_Get(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- // check that keys are present
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-
- assert.NoError(t, s.Delete("key", "key2"))
- // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
- res, err = s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 0)
-}
-
-func TestStorage_Set_GetM(t *testing.T) {
- s := initStorage()
-
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: "",
- }))
-
- res, err := s.MGet("key", "key2")
- assert.NoError(t, err)
- assert.Len(t, res, 2)
-}
-
-func TestStorage_MExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
-
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
- // set timeout to 5 sec
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- i1 := kv.Item{
- Key: "key",
- Value: "",
- TTL: nowPlusFive,
- }
- i2 := kv.Item{
- Key: "key2",
- Value: "",
- TTL: nowPlusFive,
- }
- assert.NoError(t, s.MExpire(i1, i2))
-
- time.Sleep(time.Second * 7)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestNilAndWrongArgs(t *testing.T) {
- s := initStorage()
- defer func() {
- if err := s.Close(); err != nil {
- panic(err)
- }
- }()
-
- // check
- v, err := s.Has("key")
- assert.NoError(t, err)
- assert.False(t, v["key"])
-
- _, err = s.Has("")
- assert.Error(t, err)
-
- _, err = s.Get("")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.Get(" ")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", "")
- assert.Error(t, err)
-
- _, err = s.MGet("key", "key2", " ")
- assert.Error(t, err)
-
- assert.NoError(t, s.Set(kv.Item{}))
- _, err = s.Has("key")
- assert.NoError(t, err)
-
- err = s.Delete("")
- assert.Error(t, err)
-
- err = s.Delete("key", "")
- assert.Error(t, err)
-
- err = s.Delete("key", " ")
- assert.Error(t, err)
-
- err = s.Delete("key")
- assert.NoError(t, err)
-}
-
-func TestStorage_SetExpire_TTL(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- // ensure that storage is clean
- v, err := s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- },
- kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- // set timeout to 5 sec
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "value",
- TTL: nowPlusFive,
- },
- kv.Item{
- Key: "key2",
- Value: "value",
- TTL: nowPlusFive,
- }))
-
- time.Sleep(time.Second * 2)
- m, err := s.TTL("key", "key2")
- assert.NoError(t, err)
-
- // remove a precision 4.02342342 -> 4
- keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- // remove a precision 4.02342342 -> 4
- key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
- if err != nil {
- t.Fatal(err)
- }
-
- assert.True(t, keyTTL < 5)
- assert.True(t, key2TTL < 5)
-
- time.Sleep(time.Second * 4)
-
- // ensure that storage is clean
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- assert.False(t, v["key"])
- assert.False(t, v["key2"])
-}
-
-func TestConcurrentReadWriteTransactions(t *testing.T) {
- s := initStorage()
- defer func() {
- cleanup(t, s, "key", "key2")
- if err := s.Close(); err != nil {
- t.Fatal(err)
- }
- }()
-
- v, err := s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.False(t, v["key"])
-
- assert.NoError(t, s.Set(kv.Item{
- Key: "key",
- Value: "hello world",
- TTL: "",
- }, kv.Item{
- Key: "key2",
- Value: "hello world",
- TTL: "",
- }))
-
- v, err = s.Has("key", "key2")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- assert.True(t, v["key2"])
-
- wg := &sync.WaitGroup{}
- wg.Add(3)
-
- m := &sync.RWMutex{}
- // concurrently set the keys
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- // set is writable transaction
- // it should stop readable
- assert.NoError(t, s.Set(kv.Item{
- Key: "key" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }, kv.Item{
- Key: "key2" + strconv.Itoa(i),
- Value: "hello world" + strconv.Itoa(i),
- TTL: "",
- }))
- m.Unlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.RLock()
- v, err = s.Has("key")
- assert.NoError(t, err)
- // no such key
- assert.True(t, v["key"])
- m.RUnlock()
- }
- }(s)
-
- // should be no errors
- go func(s kv.Storage) {
- defer wg.Done()
- for i := 0; i <= 100; i++ {
- m.Lock()
- err = s.Delete("key" + strconv.Itoa(i))
- assert.NoError(t, err)
- m.Unlock()
- }
- }(s)
-
- wg.Wait()
-}