summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-07 01:06:50 +0300
committerValery Piashchynski <[email protected]>2021-01-07 01:06:50 +0300
commitc1465d3bcdf24a78440300aa51e7cfc92ce874a8 (patch)
tree6e0f5107eba90df73724b6611ca6adfa148d2a3f /plugins
parentc9f670ee734355cbc5d504186946b7db67cf62b5 (diff)
KV, updated, bug fixed, with intergration tests via plugins
Diffstat (limited to 'plugins')
-rw-r--r--plugins/kv/boltdb/config.go11
-rw-r--r--plugins/kv/boltdb/plugin.go148
-rw-r--r--plugins/kv/boltdb/plugin_unit_test.go172
-rw-r--r--plugins/kv/interface.go20
-rw-r--r--plugins/kv/memcached/plugin.go25
-rw-r--r--plugins/kv/memcached/storage_test.go108
-rw-r--r--plugins/kv/memory/plugin.go (renamed from plugins/kv/memory/storage.go)31
-rw-r--r--plugins/kv/memory/storage_test.go114
-rw-r--r--plugins/kv/rpc.go110
9 files changed, 431 insertions, 308 deletions
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go
index 6b116611..b2e1e636 100644
--- a/plugins/kv/boltdb/config.go
+++ b/plugins/kv/boltdb/config.go
@@ -8,16 +8,17 @@ type Config struct {
File string
// Bucket to store data in boltDB
Bucket string
-
+ // db file permissions
Permissions int
- TTL int
+ // timeout
+ Interval uint `yaml:"interval"`
}
-func (s *Config) InitDefaults() error {
+// InitDefaults initializes default values for the boltdb
+func (s *Config) InitDefaults() {
s.Dir = "." // current dir
s.Bucket = "rr" // default bucket name
s.File = "rr.db" // default file name
s.Permissions = 0777 // free for all
- s.TTL = 60 // 60 seconds is default TTL
- return nil
+ s.Interval = 60 // default is 60 seconds timeout
}
diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go
index e5eda0c2..6cfc49f6 100644
--- a/plugins/kv/boltdb/plugin.go
+++ b/plugins/kv/boltdb/plugin.go
@@ -2,7 +2,6 @@ package boltdb
import (
"bytes"
- "context"
"encoding/gob"
"os"
"path"
@@ -41,78 +40,23 @@ type Plugin struct {
stop chan struct{}
}
-// NewBoltClient instantiate new BOLTDB client
-// The parameters are:
-// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
-// perm os.FileMode -- file permissions, for example 0777
-// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
-// bucket string -- name of the bucket to use, should be UTF-8
-func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
- const op = errors.Op("newBoltClient")
- db, err := bolt.Open(path, perm, options)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // bucket should be SET
- if bucket == "" {
- return nil, errors.E(op, errors.Str("bucket should be set"))
- }
-
- // create bucket if it does not exist
- // tx.Commit invokes via the db.Update
- err = db.Update(func(tx *bolt.Tx) error {
- _, err = tx.CreateBucketIfNotExists([]byte(bucket))
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- })
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // if TTL is not set, make it default
- if ttl == 0 {
- ttl = time.Minute
- }
-
- s := &Plugin{
- DB: db,
- bucket: []byte(bucket),
- stop: make(chan struct{}),
- timeout: ttl,
- gc: &sync.Map{},
- }
-
- // start the TTL gc
- go s.gcPhase()
-
- return s, nil
-}
-
func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
const op = errors.Op("boltdb plugin init")
s.cfg = &Config{}
+ s.cfg.InitDefaults()
+
err := cfg.UnmarshalKey(PluginName, &s.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
+ // set the logger
s.log = log
- return nil
-}
-
-func (s *Plugin) Serve() chan error {
- const op = errors.Op("boltdb serve")
- errCh := make(chan error, 1)
-
db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil)
if err != nil {
- errCh <- errors.E(op, err)
- return errCh
+ return errors.E(op, err)
}
// create bucket if it does not exist
@@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error {
}
return nil
})
+
if err != nil {
- errCh <- err
- return errCh
+ return errors.E(op, err)
}
s.DB = db
s.bucket = []byte(s.cfg.Bucket)
s.stop = make(chan struct{})
- s.timeout = time.Duration(s.cfg.TTL) * time.Second
+ s.timeout = time.Duration(s.cfg.Interval) * time.Second
s.gc = &sync.Map{}
+ return nil
+}
+
+func (s *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
// start the TTL gc
go s.gcPhase()
return errCh
}
-func (s Plugin) Stop() error {
+func (s *Plugin) Stop() error {
const op = errors.Op("boltdb stop")
err := s.Close()
if err != nil {
@@ -151,8 +100,9 @@ func (s Plugin) Stop() error {
return nil
}
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("boltdb Has")
+ s.log.Debug("boltdb HAS method called", "args", keys)
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
}
@@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return errors.E(op, errors.EmptyKey)
}
@@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
if b == nil {
return errors.E(op, errors.NoSuchBucket)
}
- exist := b.Get([]byte(key))
+ exist := b.Get([]byte(keys[i]))
if exist != nil {
- m[key] = true
+ m[keys[i]] = true
}
}
return nil
})
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
+ s.log.Debug("boltdb HAS method finished")
return m, nil
}
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s *Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("boltdb Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
buf := bytes.NewReader(val)
decoder := gob.NewDecoder(buf)
- i := kv.Item{}
+ var i string
err := decoder.Decode(&i)
if err != nil {
// unsafe (w/o runes) convert
@@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
}
// set the value
- val = []byte(i.Value)
+ val = []byte(i)
}
return nil
})
@@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
return val, nil
}
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb MGet")
// defence
if keys == nil {
@@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// should not be empty keys
- for _, key := range keys {
- keyTrimmed := strings.TrimSpace(key)
+ for i := range keys {
+ keyTrimmed := strings.TrimSpace(keys[i])
if keyTrimmed == "" {
return nil, errors.E(op, errors.EmptyKey)
}
@@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
return errors.E(op, errors.NoSuchBucket)
}
- for _, key := range keys {
- value := b.Get([]byte(key))
+ buf := new(bytes.Buffer)
+ var out string
+ buf.Grow(100)
+ for i := range keys {
+ value := b.Get([]byte(keys[i]))
+ buf.Write(value)
+ // allocate enough space
+ dec := gob.NewDecoder(buf)
if value != nil {
- m[key] = value
+ err := dec.Decode(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ m[keys[i]] = out
+ buf.Reset()
+ out = ""
}
}
@@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
}
// Set puts the K/V to the bolt
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) Set(items ...kv.Item) error {
const op = errors.Op("boltdb Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
return errors.E(op, errors.EmptyItem)
}
- err = encoder.Encode(&items[i])
+ // Encode value
+ err = encoder.Encode(&items[i].Value)
if err != nil {
return errors.E(op, err)
}
@@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
if err != nil {
return errors.E(op, err)
}
+ // Store key TTL in the separate map
s.gc.Store(items[i].Key, items[i].TTL)
}
@@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
}
// Delete all keys from DB
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s *Plugin) Delete(keys ...string) error {
const op = errors.Op("boltdb Delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("boltdb MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
return nil
}
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("boltdb TTL")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}
}
// Close the DB connection
-func (s Plugin) Close() error {
+func (s *Plugin) Close() error {
// stop the keys GC
s.stop <- struct{}{}
return s.DB.Close()
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// ========================= PRIVATE =================================
-func (s Plugin) gcPhase() {
+func (s *Plugin) gcPhase() {
t := time.NewTicker(s.timeout)
defer t.Stop()
for {
@@ -449,6 +424,7 @@ func (s Plugin) gcPhase() {
if now.After(v) {
// time expired
s.gc.Delete(k)
+ s.log.Debug("key deleted", "key", k)
err := s.DB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
if b == nil {
diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go
index a12c830d..2459e493 100644
--- a/plugins/kv/boltdb/plugin_unit_test.go
+++ b/plugins/kv/boltdb/plugin_unit_test.go
@@ -1,19 +1,74 @@
package boltdb
import (
- "context"
"os"
"strconv"
"sync"
"testing"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/kv"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/stretchr/testify/assert"
+ bolt "go.etcd.io/bbolt"
+ "go.uber.org/zap"
)
+// NewBoltClient instantiate new BOLTDB client
+// The parameters are:
+// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created
+// perm os.FileMode -- file permissions, for example 0777
+// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other
+// bucket string -- name of the bucket to use, should be UTF-8
+func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) {
+ const op = errors.Op("newBoltClient")
+ db, err := bolt.Open(path, perm, options)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // bucket should be SET
+ if bucket == "" {
+ return nil, errors.E(op, errors.Str("bucket should be set"))
+ }
+
+ // create bucket if it does not exist
+ // tx.Commit invokes via the db.Update
+ err = db.Update(func(tx *bolt.Tx) error {
+ _, err = tx.CreateBucketIfNotExists([]byte(bucket))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if TTL is not set, make it default
+ if ttl == 0 {
+ ttl = time.Minute
+ }
+
+ l, _ := zap.NewDevelopment()
+ s := &Plugin{
+ DB: db,
+ bucket: []byte(bucket),
+ stop: make(chan struct{}),
+ timeout: ttl,
+ gc: &sync.Map{},
+ log: logger.NewZapAdapter(l),
+ }
+
+ // start the TTL gc
+ go s.gcPhase()
+
+ return s, nil
+}
+
func initStorage() kv.Storage {
- storage, err := NewBoltClient("rr.db", 0777, nil, "rr", time.Second)
+ storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second)
if err != nil {
panic(err)
}
@@ -36,7 +91,7 @@ func TestStorage_Has(t *testing.T) {
cleanup(t, "rr.db")
}()
- v, err := s.Has(context.Background(), "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -50,13 +105,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -66,7 +120,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -82,13 +136,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -98,7 +151,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -115,7 +168,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -133,7 +186,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -146,7 +199,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
@@ -164,13 +217,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -180,13 +232,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -200,13 +252,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -216,13 +267,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
- // no such key
+
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "hello world" {
@@ -239,13 +290,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -255,20 +305,20 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ assert.NoError(t, s.Delete("key", "key2"))
// check that keys are not present
- res, err = s.MGet(ctx, "key", "key2")
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
@@ -281,13 +331,12 @@ func TestStorage_Set_GetM(t *testing.T) {
}
cleanup(t, "rr.db")
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -297,14 +346,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -313,61 +361,60 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
}))
- assert.Error(t, s.Set(ctx, kv.Item{
+ assert.Error(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "asdf",
}))
- _, err = s.Has(ctx, "key")
+ _, err = s.Has("key")
assert.NoError(t, err)
- assert.Error(t, s.Set(ctx, kv.Item{}))
+ assert.Error(t, s.Set(kv.Item{}))
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -376,12 +423,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -404,12 +451,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -417,7 +464,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -426,12 +472,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -445,7 +491,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -457,7 +503,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}))
time.Sleep(time.Second * 2)
- m, err := s.TTL(ctx, "key", "key2")
+ m, err := s.TTL("key", "key2")
assert.NoError(t, err)
// remove a precision 4.02342342 -> 4
@@ -478,7 +524,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 4)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 3512fd73..c1367cdf 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,10 +1,6 @@
package kv
// Item represents general storage item
-import (
- "context"
-)
-
type Item struct {
// Key of item
Key string
@@ -17,28 +13,28 @@ type Item struct {
// Storage represents single abstract storage.
type Storage interface {
// Has checks if value exists.
- Has(ctx context.Context, keys ...string) (map[string]bool, error)
+ Has(keys ...string) (map[string]bool, error)
// Get loads value content into a byte slice.
- Get(ctx context.Context, key string) ([]byte, error)
+ Get(key string) ([]byte, error)
// MGet loads content of multiple values
- // If there are no values for keys, key will no be in the map
- MGet(ctx context.Context, keys ...string) (map[string]interface{}, error)
+ // Returns the map with existing keys and associated values
+ MGet(keys ...string) (map[string]interface{}, error)
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(ctx context.Context, items ...Item) error
+ Set(items ...Item) error
// MExpire sets the TTL for multiply keys
- MExpire(ctx context.Context, items ...Item) error
+ MExpire(items ...Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
- TTL(ctx context.Context, keys ...string) (map[string]interface{}, error)
+ TTL(keys ...string) (map[string]interface{}, error)
// Delete one or multiple keys.
- Delete(ctx context.Context, keys ...string) error
+ Delete(keys ...string) error
// Close closes the storage and underlying resources.
Close() error
diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go
index 69f96bfe..f5111c04 100644
--- a/plugins/kv/memcached/plugin.go
+++ b/plugins/kv/memcached/plugin.go
@@ -1,7 +1,6 @@
package memcached
import (
- "context"
"strings"
"time"
@@ -58,8 +57,18 @@ func (s *Plugin) Stop() error {
return nil
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// Has checks the key for existence
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("memcached Has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -84,7 +93,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
// Get gets the item for the given key. ErrCacheMiss is returned for a
// memcache cache miss. The key must be at most 250 bytes in length.
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("memcached Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -106,7 +115,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
// return map with key -- string
// and map value as value -- []byte
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached MGet")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -141,7 +150,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s Plugin) Set(items ...kv.Item) error {
const op = errors.Op("memcached Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -182,7 +191,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("memcached MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -209,12 +218,12 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
}
// return time in seconds (int32) for a given keys
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("memcached HTTLas")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s Plugin) Delete(keys ...string) error {
const op = errors.Op("memcached Has")
if keys == nil {
return errors.E(op, errors.NoKeys)
diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go
index 4b59bbd0..3d37748b 100644
--- a/plugins/kv/memcached/storage_test.go
+++ b/plugins/kv/memcached/storage_test.go
@@ -1,7 +1,6 @@
package memcached
import (
- "context"
"strconv"
"sync"
"testing"
@@ -16,7 +15,7 @@ func initStorage() kv.Storage {
}
func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(context.Background(), keys...)
+ err := s.Delete(keys...)
if err != nil {
t.Fatalf("error during cleanup: %s", err.Error())
}
@@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) {
func TestStorage_Has(t *testing.T) {
s := initStorage()
- ctx := context.Background()
-
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -57,7 +53,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -73,13 +69,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -89,13 +84,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -109,13 +104,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -125,13 +119,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "hello world" {
@@ -148,13 +142,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -164,27 +157,26 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
+ assert.NoError(t, s.Delete("key", "key2"))
// check that keys are not present
- res, err = s.MGet(ctx, "key", "key2")
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
func TestStorage_Set_GetM(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -194,11 +186,11 @@ func TestStorage_Set_GetM(t *testing.T) {
}
}()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -208,14 +200,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -224,12 +215,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -252,12 +243,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -265,7 +256,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key")
if err := s.Close(); err != nil {
@@ -274,46 +264,45 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.Error(t, s.Set(ctx, kv.Item{}))
+ assert.Error(t, s.Set(kv.Item{}))
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -322,12 +311,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -341,7 +330,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -355,7 +344,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -370,13 +359,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -386,7 +374,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -403,7 +391,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -421,7 +409,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -434,7 +422,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/plugin.go
index f4bdacea..2c65f14c 100644
--- a/plugins/kv/memory/storage.go
+++ b/plugins/kv/memory/plugin.go
@@ -1,7 +1,6 @@
package memory
import (
- "context"
"strings"
"sync"
"time"
@@ -49,7 +48,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (s Plugin) Serve() chan error {
+func (s *Plugin) Serve() chan error {
errCh := make(chan error, 1)
// start in-memory gc for kv
go s.gc()
@@ -57,7 +56,7 @@ func (s Plugin) Serve() chan error {
return errCh
}
-func (s Plugin) Stop() error {
+func (s *Plugin) Stop() error {
const op = errors.Op("in-memory storage stop")
err := s.Close()
if err != nil {
@@ -66,7 +65,7 @@ func (s Plugin) Stop() error {
return nil
}
-func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) {
+func (s *Plugin) Has(keys ...string) (map[string]bool, error) {
const op = errors.Op("in-memory storage Has")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -86,7 +85,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error
return m, nil
}
-func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
+func (s *Plugin) Get(key string) ([]byte, error) {
const op = errors.Op("in-memory storage Get")
// to get cases like " "
keyTrimmed := strings.TrimSpace(key)
@@ -102,7 +101,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) {
return nil, nil
}
-func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in-memory storage MGet")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -127,7 +126,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{
return m, nil
}
-func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) Set(items ...kv.Item) error {
const op = errors.Op("in-memory storage Set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -150,7 +149,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
+func (s *Plugin) MExpire(items ...kv.Item) error {
const op = errors.Op("in-memory storage MExpire")
for i := range items {
if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
@@ -179,7 +178,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error {
return nil
}
-func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) {
+func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) {
const op = errors.Op("in-memory storage TTL")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -203,7 +202,7 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}
return m, nil
}
-func (s Plugin) Delete(ctx context.Context, keys ...string) error {
+func (s *Plugin) Delete(keys ...string) error {
const op = errors.Op("in-memory storage Delete")
if keys == nil {
return errors.E(op, errors.NoKeys)
@@ -224,11 +223,21 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error {
}
// Close clears the in-memory storage
-func (s Plugin) Close() error {
+func (s *Plugin) Close() error {
s.stop <- struct{}{}
return nil
}
+// RPCService returns associated rpc service.
+func (s *Plugin) RPC() interface{} {
+ return kv.NewRPCServer(s, s.log)
+}
+
+// Name returns plugin user-friendly name
+func (s *Plugin) Name() string {
+ return PluginName
+}
+
// ================================== PRIVATE ======================================
func (s *Plugin) gc() {
diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go
index b7b46637..4b30460d 100644
--- a/plugins/kv/memory/storage_test.go
+++ b/plugins/kv/memory/storage_test.go
@@ -1,7 +1,6 @@
package memory
import (
- "context"
"strconv"
"sync"
"testing"
@@ -16,7 +15,7 @@ func initStorage() kv.Storage {
}
func cleanup(t *testing.T, s kv.Storage, keys ...string) {
- err := s.Delete(context.Background(), keys...)
+ err := s.Delete(keys...)
if err != nil {
t.Fatalf("error during cleanup: %s", err.Error())
}
@@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) {
func TestStorage_Has(t *testing.T) {
s := initStorage()
- ctx := context.Background()
-
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
}
@@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -58,7 +54,7 @@ func TestStorage_Has_Set_Has(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -74,13 +70,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -91,13 +86,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
@@ -111,13 +106,12 @@ func TestStorage_Has_Set_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -128,13 +122,13 @@ func TestStorage_Has_Set_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
- res, err := s.Get(ctx, "key")
+ res, err := s.Get("key")
assert.NoError(t, err)
if string(res) != "value" {
@@ -151,13 +145,12 @@ func TestStorage_Set_Del_Get(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -168,27 +161,26 @@ func TestStorage_Set_Del_Get(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
assert.True(t, v["key2"])
// check that keys are present
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
- assert.NoError(t, s.Delete(ctx, "key", "key2"))
- // check that keys are not presentps -eo state,uid,pid,ppid,rtprio,time,comm
- res, err = s.MGet(ctx, "key", "key2")
+ assert.NoError(t, s.Delete("key", "key2"))
+ // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm
+ res, err = s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 0)
}
func TestStorage_Set_GetM(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -198,11 +190,11 @@ func TestStorage_Set_GetM(t *testing.T) {
}
}()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: "",
@@ -213,14 +205,13 @@ func TestStorage_Set_GetM(t *testing.T) {
TTL: "",
}))
- res, err := s.MGet(ctx, "key", "key2")
+ res, err := s.MGet("key", "key2")
assert.NoError(t, err)
assert.Len(t, res, 2)
}
func TestStorage_MExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
@@ -230,12 +221,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -258,12 +249,12 @@ func TestStorage_MExpire_TTL(t *testing.T) {
Value: "",
TTL: nowPlusFive,
}
- assert.NoError(t, s.MExpire(ctx, i1, i2))
+ assert.NoError(t, s.MExpire(i1, i2))
time.Sleep(time.Second * 6)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -271,7 +262,6 @@ func TestStorage_MExpire_TTL(t *testing.T) {
func TestNilAndWrongArgs(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
if err := s.Close(); err != nil {
panic(err)
@@ -279,48 +269,47 @@ func TestNilAndWrongArgs(t *testing.T) {
}()
// check
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
assert.False(t, v["key"])
- _, err = s.Has(ctx, "")
+ _, err = s.Has("")
assert.Error(t, err)
- _, err = s.Get(ctx, "")
+ _, err = s.Get("")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.Get(ctx, " ")
+ _, err = s.Get(" ")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", "")
+ _, err = s.MGet("key", "key2", "")
assert.Error(t, err)
- _, err = s.MGet(ctx, "key", "key2", " ")
+ _, err = s.MGet("key", "key2", " ")
assert.Error(t, err)
- assert.NoError(t, s.Set(ctx, kv.Item{}))
- _, err = s.Has(ctx, "key")
+ assert.NoError(t, s.Set(kv.Item{}))
+ _, err = s.Has("key")
assert.NoError(t, err)
- err = s.Delete(ctx, "")
+ err = s.Delete("")
assert.Error(t, err)
- err = s.Delete(ctx, "key", "")
+ err = s.Delete("key", "")
assert.Error(t, err)
- err = s.Delete(ctx, "key", " ")
+ err = s.Delete("key", " ")
assert.Error(t, err)
- err = s.Delete(ctx, "key")
+ err = s.Delete("key")
assert.NoError(t, err)
}
func TestStorage_SetExpire_TTL(t *testing.T) {
s := initStorage()
- ctx := context.Background()
defer func() {
cleanup(t, s, "key", "key2")
if err := s.Close(); err != nil {
@@ -329,12 +318,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}()
// ensure that storage is clean
- v, err := s.Has(ctx, "key", "key2")
+ v, err := s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -348,7 +337,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339)
// set timeout to 5 sec
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "value",
TTL: nowPlusFive,
@@ -360,7 +349,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
}))
time.Sleep(time.Second * 2)
- m, err := s.TTL(ctx, "key", "key2")
+ m, err := s.TTL("key", "key2")
assert.NoError(t, err)
// remove a precision 4.02342342 -> 4
@@ -381,7 +370,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) {
time.Sleep(time.Second * 4)
// ensure that storage is clean
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
assert.False(t, v["key"])
assert.False(t, v["key2"])
@@ -396,13 +385,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
}
}()
- ctx := context.Background()
- v, err := s.Has(ctx, "key")
+ v, err := s.Has("key")
assert.NoError(t, err)
// no such key
assert.False(t, v["key"])
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key",
Value: "hello world",
TTL: "",
@@ -412,7 +400,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
TTL: "",
}))
- v, err = s.Has(ctx, "key", "key2")
+ v, err = s.Has("key", "key2")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -429,7 +417,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
m.Lock()
// set is writable transaction
// it should stop readable
- assert.NoError(t, s.Set(ctx, kv.Item{
+ assert.NoError(t, s.Set(kv.Item{
Key: "key" + strconv.Itoa(i),
Value: "hello world" + strconv.Itoa(i),
TTL: "",
@@ -447,7 +435,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.RLock()
- v, err = s.Has(ctx, "key")
+ v, err = s.Has("key")
assert.NoError(t, err)
// no such key
assert.True(t, v["key"])
@@ -460,7 +448,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) {
defer wg.Done()
for i := 0; i <= 1000; i++ {
m.Lock()
- err = s.Delete(ctx, "key"+strconv.Itoa(i))
+ err = s.Delete("key" + strconv.Itoa(i))
assert.NoError(t, err)
m.Unlock()
}
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
new file mode 100644
index 00000000..751f0d12
--- /dev/null
+++ b/plugins/kv/rpc.go
@@ -0,0 +1,110 @@
+package kv
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Wrapper for the plugin
+type RPCServer struct {
+ // svc is a plugin implementing Storage interface
+ svc Storage
+ // Logger
+ log logger.Logger
+}
+
+// NewRPCServer construct RPC server for the particular plugin
+func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
+ return &RPCServer{
+ svc: srv,
+ log: log,
+ }
+}
+
+// data Data
+func (r *RPCServer) Has(in []string, res *map[string]bool) error {
+ const op = errors.Op("rpc server Has")
+ ret, err := r.svc.Has(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update the value in the pointer
+ *res = ret
+ return nil
+}
+
+// in SetData
+func (r *RPCServer) Set(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server Set")
+
+ err := r.svc.Set(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server MGet")
+ ret, err := r.svc.MGet(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // update return value
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) MExpire(in []Item, ok *bool) error {
+ const op = errors.Op("rpc server MExpire")
+
+ err := r.svc.MExpire(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *ok = true
+ return nil
+}
+
+// in Data
+func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
+ const op = errors.Op("rpc server TTL")
+
+ ret, err := r.svc.TTL(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *res = ret
+ return nil
+}
+
+// in Data
+func (r *RPCServer) Delete(in []string, ok *bool) error {
+ const op = errors.Op("rpc server Delete")
+ err := r.svc.Delete(in...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+// in string, storages
+func (r *RPCServer) Close(storage string, ok *bool) error {
+ const op = errors.Op("rpc server Close")
+ err := r.svc.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ *ok = true
+
+ return nil
+}