summaryrefslogtreecommitdiff
path: root/plugins/kv/memory
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/memory')
-rw-r--r--plugins/kv/memory/config.go15
-rw-r--r--plugins/kv/memory/plugin.go262
-rw-r--r--plugins/kv/memory/plugin_unit_test.go473
3 files changed, 750 insertions, 0 deletions
diff --git a/plugins/kv/memory/config.go b/plugins/kv/memory/config.go
new file mode 100644
index 00000000..0816f734
--- /dev/null
+++ b/plugins/kv/memory/config.go
@@ -0,0 +1,15 @@
+package memory
+
+// Config is default config for the in-memory driver
+type Config struct {
+ // Enabled or disabled (true or false)
+ Enabled bool
+ // Interval for the check
+ Interval int
+}
+
+// InitDefaults by default driver is turned off
+func (c *Config) InitDefaults() {
+ c.Enabled = false
+ c.Interval = 60 // seconds
+}
diff --git a/plugins/kv/memory/plugin.go b/plugins/kv/memory/plugin.go
new file mode 100644
index 00000000..d2d3721b
--- /dev/null
+++ b/plugins/kv/memory/plugin.go
@@ -0,0 +1,262 @@
+package memory
+
+import (
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// PluginName is user friendly name for the plugin
+const PluginName = "memory"
+
+type Plugin struct {
+ // heap is user map for the key-value pairs
+ heap sync.Map
+ stop chan struct{}
+
+ log logger.Logger
+ cfg *Config
+}
+
+func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("in-memory storage init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ s.log = log
+
+ s.stop = make(chan struct{}, 1)
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ // start in-memory gc for kv
+ go s.gc()
+
+ return errCh
+}
+
+func (s *Plugin) Stop() error {
+ const op = errors.Op("in-memory storage stop")
+ err := s.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
+ const op = errors.Op("in-memory storage Has")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+ m := make(map[string]bool)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if _, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = true
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Plugin) Get(key string) ([]byte, error) {
+ const op = errors.Op("in-memory storage Get")
+ // to get cases like " "
+ keyTrimmed := strings.TrimSpace(key)
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+
+ if data, exist := s.heap.Load(key); exist {
+ // here might be a panic
+ // but data only could be a string, see Set function
+ return []byte(data.(kv.Item).Value), nil
+ }
+ return nil, nil
+}
+
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage MGet")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if value, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = value.(kv.Item).Value
+ }
+ }
+
+ return m, nil
+}
+
+func (s *Plugin) Set(items ...kv.Item) error {
+ const op = errors.Op("in-memory storage Set")
+ if items == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ for i := range items {
+ // TTL is set
+ if items[i].TTL != "" {
+ // check the TTL in the item
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return err
+ }
+ }
+
+ s.heap.Store(items[i].Key, items[i])
+ }
+ return nil
+}
+
+// MExpire sets the expiration time to the key
+// If key already has the expiration time, it will be overwritten
+func (s *Plugin) MExpire(items ...kv.Item) error {
+ const op = errors.Op("in-memory storage MExpire")
+ for i := range items {
+ if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ return errors.E(op, errors.Str("should set timeout and at least one key"))
+ }
+
+ // if key exist, overwrite it value
+ if pItem, ok := s.heap.Load(items[i].Key); ok {
+ // check that time is correct
+ _, err := time.Parse(time.RFC3339, items[i].TTL)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ tmp := pItem.(kv.Item)
+ // guess that t is in the future
+ // in memory is just FOR TESTING PURPOSES
+ // LOGIC ISN'T IDEAL
+ s.heap.Store(items[i].Key, kv.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ TTL: items[i].TTL,
+ })
+ }
+ }
+
+ return nil
+}
+
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
+ const op = errors.Op("in-memory storage TTL")
+ if keys == nil {
+ return nil, errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return nil, errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ m := make(map[string]interface{}, len(keys))
+
+ for i := range keys {
+ if item, ok := s.heap.Load(keys[i]); ok {
+ m[keys[i]] = item.(kv.Item).TTL
+ }
+ }
+ return m, nil
+}
+
+func (s *Plugin) Delete(keys ...string) error {
+ const op = errors.Op("in-memory storage Delete")
+ if keys == nil {
+ return errors.E(op, errors.NoKeys)
+ }
+
+ // should not be empty keys
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
+ if keyTrimmed == "" {
+ return errors.E(op, errors.EmptyKey)
+ }
+ }
+
+ for i := range keys {
+ s.heap.Delete(keys[i])
+ }
+ return nil
+}
+
+// Close clears the in-memory storage
+func (s *Plugin) Close() error {
+ s.stop <- struct{}{}
+ return nil
+}
+
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
+// ================================== PRIVATE ======================================
+
+func (s *Plugin) gc() {
+ // TODO check
+ ticker := time.NewTicker(time.Duration(s.cfg.Interval) * time.Second)
+ for {
+ select {
+ case <-s.stop:
+ ticker.Stop()
+ return
+ case now := <-ticker.C:
+ // check every second
+ s.heap.Range(func(key, value interface{}) bool {
+ v := value.(kv.Item)
+ if v.TTL == "" {
+ return true
+ }
+
+ t, err := time.Parse(time.RFC3339, v.TTL)
+ if err != nil {
+ return false
+ }
+
+ if now.After(t) {
+ s.log.Debug("key deleted", "key", key)
+ s.heap.Delete(key)
+ }
+ return true
+ })
+ }
+ }
+}
diff --git a/plugins/kv/memory/plugin_unit_test.go b/plugins/kv/memory/plugin_unit_test.go
new file mode 100644
index 00000000..d3b24860
--- /dev/null
+++ b/plugins/kv/memory/plugin_unit_test.go
@@ -0,0 +1,473 @@
+package memory
+
+import (
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/zap"
+)
+
+func initStorage() kv.Storage {
+ p := &Plugin{
+ stop: make(chan struct{}),
+ }
+ p.cfg = &Config{
+ Enabled: true,
+ Interval: 1,
+ }
+
+ l, _ := zap.NewDevelopment()
+ p.log = logger.NewZapAdapter(l)
+
+ go p.gc()
+
+ return p
+}
+
+func cleanup(t *testing.T, s kv.Storage, keys ...string) {
+ err := s.Delete(keys...)
+ if err != nil {
+ t.Fatalf("error during cleanup: %s", err.Error())
+ }
+}
+
+func TestStorage_Has(t *testing.T) {
+ s := initStorage()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+}
+
+func TestStorage_Has_Set_Has(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+}
+
+func TestStorage_Has_Set_MGet(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_Has_Set_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ res, err := s.Get("key")
+ assert.NoError(t, err)
+
+ if string(res) != "value" {
+ t.Fatal("wrong value by key")
+ }
+}
+
+func TestStorage_Set_Del_Get(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ // check that keys are present
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
+ res, err = s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 0)
+}
+
+func TestStorage_Set_GetM(t *testing.T) {
+ s := initStorage()
+
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: "",
+ }))
+
+ res, err := s.MGet("key", "key2")
+ assert.NoError(t, err)
+ assert.Len(t, res, 2)
+}
+
+func TestStorage_MExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+ // set timeout to 5 sec
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ i1 := kv.Item{
+ Key: "key",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ i2 := kv.Item{
+ Key: "key2",
+ Value: "",
+ TTL: nowPlusFive,
+ }
+ assert.NoError(t, s.MExpire(i1, i2))
+
+ time.Sleep(time.Second * 6)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestNilAndWrongArgs(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ if err := s.Close(); err != nil {
+ panic(err)
+ }
+ }()
+
+ // check
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+
+ _, err = s.Has("")
+ assert.Error(t, err)
+
+ _, err = s.Get("")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.Get(" ")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", "")
+ assert.Error(t, err)
+
+ _, err = s.MGet("key", "key2", " ")
+ assert.Error(t, err)
+
+ assert.NoError(t, s.Set(kv.Item{}))
+ _, err = s.Has("key")
+ assert.NoError(t, err)
+
+ err = s.Delete("")
+ assert.Error(t, err)
+
+ err = s.Delete("key", "")
+ assert.Error(t, err)
+
+ err = s.Delete("key", " ")
+ assert.Error(t, err)
+
+ err = s.Delete("key")
+ assert.NoError(t, err)
+}
+
+func TestStorage_SetExpire_TTL(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // ensure that storage is clean
+ v, err := s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
+
+ // set timeout to 5 sec
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "value",
+ TTL: nowPlusFive,
+ },
+ kv.Item{
+ Key: "key2",
+ Value: "value",
+ TTL: nowPlusFive,
+ }))
+
+ time.Sleep(time.Second * 2)
+ m, err := s.TTL("key", "key2")
+ assert.NoError(t, err)
+
+ // remove a precision 4.02342342 -> 4
+ keyTTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // remove a precision 4.02342342 -> 4
+ key2TTL, err := strconv.Atoi(m["key"].(string)[0:1])
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.True(t, keyTTL < 5)
+ assert.True(t, key2TTL < 5)
+
+ time.Sleep(time.Second * 4)
+
+ // ensure that storage is clean
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ assert.False(t, v["key"])
+ assert.False(t, v["key2"])
+}
+
+func TestConcurrentReadWriteTransactions(t *testing.T) {
+ s := initStorage()
+ defer func() {
+ cleanup(t, s, "key", "key2")
+ if err := s.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ v, err := s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.False(t, v["key"])
+
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key",
+ Value: "hello world",
+ TTL: "",
+ }, kv.Item{
+ Key: "key2",
+ Value: "hello world",
+ TTL: "",
+ }))
+
+ v, err = s.Has("key", "key2")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ assert.True(t, v["key2"])
+
+ wg := &sync.WaitGroup{}
+ wg.Add(3)
+
+ m := &sync.RWMutex{}
+ // concurrently set the keys
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ // set is writable transaction
+ // it should stop readable
+ assert.NoError(t, s.Set(kv.Item{
+ Key: "key" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }, kv.Item{
+ Key: "key2" + strconv.Itoa(i),
+ Value: "hello world" + strconv.Itoa(i),
+ TTL: "",
+ }))
+ m.Unlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.RLock()
+ v, err = s.Has("key")
+ assert.NoError(t, err)
+ // no such key
+ assert.True(t, v["key"])
+ m.RUnlock()
+ }
+ }(s)
+
+ // should be no errors
+ go func(s kv.Storage) {
+ defer wg.Done()
+ for i := 0; i <= 1000; i++ {
+ m.Lock()
+ err = s.Delete("key" + strconv.Itoa(i))
+ assert.NoError(t, err)
+ m.Unlock()
+ }
+ }(s)
+
+ wg.Wait()
+}