diff options
author | Valery Piashchynski <[email protected]> | 2021-01-07 01:06:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-07 01:06:50 +0300 |
commit | c1465d3bcdf24a78440300aa51e7cfc92ce874a8 (patch) | |
tree | 6e0f5107eba90df73724b6611ca6adfa148d2a3f /plugins | |
parent | c9f670ee734355cbc5d504186946b7db67cf62b5 (diff) |
KV, updated, bug fixed, with intergration tests via plugins
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/kv/boltdb/config.go | 11 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin.go | 148 | ||||
-rw-r--r-- | plugins/kv/boltdb/plugin_unit_test.go | 172 | ||||
-rw-r--r-- | plugins/kv/interface.go | 20 | ||||
-rw-r--r-- | plugins/kv/memcached/plugin.go | 25 | ||||
-rw-r--r-- | plugins/kv/memcached/storage_test.go | 108 | ||||
-rw-r--r-- | plugins/kv/memory/plugin.go (renamed from plugins/kv/memory/storage.go) | 31 | ||||
-rw-r--r-- | plugins/kv/memory/storage_test.go | 114 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 110 |
9 files changed, 431 insertions, 308 deletions
diff --git a/plugins/kv/boltdb/config.go b/plugins/kv/boltdb/config.go index 6b116611..b2e1e636 100644 --- a/plugins/kv/boltdb/config.go +++ b/plugins/kv/boltdb/config.go @@ -8,16 +8,17 @@ type Config struct { File string // Bucket to store data in boltDB Bucket string - + // db file permissions Permissions int - TTL int + // timeout + Interval uint `yaml:"interval"` } -func (s *Config) InitDefaults() error { +// InitDefaults initializes default values for the boltdb +func (s *Config) InitDefaults() { s.Dir = "." // current dir s.Bucket = "rr" // default bucket name s.File = "rr.db" // default file name s.Permissions = 0777 // free for all - s.TTL = 60 // 60 seconds is default TTL - return nil + s.Interval = 60 // default is 60 seconds timeout } diff --git a/plugins/kv/boltdb/plugin.go b/plugins/kv/boltdb/plugin.go index e5eda0c2..6cfc49f6 100644 --- a/plugins/kv/boltdb/plugin.go +++ b/plugins/kv/boltdb/plugin.go @@ -2,7 +2,6 @@ package boltdb import ( "bytes" - "context" "encoding/gob" "os" "path" @@ -41,78 +40,23 @@ type Plugin struct { stop chan struct{} } -// NewBoltClient instantiate new BOLTDB client -// The parameters are: -// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created -// perm os.FileMode -- file permissions, for example 0777 -// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other -// bucket string -- name of the bucket to use, should be UTF-8 -func NewBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { - const op = errors.Op("newBoltClient") - db, err := bolt.Open(path, perm, options) - if err != nil { - return nil, errors.E(op, err) - } - - // bucket should be SET - if bucket == "" { - return nil, errors.E(op, errors.Str("bucket should be set")) - } - - // create bucket if it does not exist - // tx.Commit invokes via the db.Update - err = db.Update(func(tx *bolt.Tx) error { - _, err = tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return errors.E(op, err) - } - return nil - }) - if err != nil { - return nil, errors.E(op, err) - } - - // if TTL is not set, make it default - if ttl == 0 { - ttl = time.Minute - } - - s := &Plugin{ - DB: db, - bucket: []byte(bucket), - stop: make(chan struct{}), - timeout: ttl, - gc: &sync.Map{}, - } - - // start the TTL gc - go s.gcPhase() - - return s, nil -} - func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error { const op = errors.Op("boltdb plugin init") s.cfg = &Config{} + s.cfg.InitDefaults() + err := cfg.UnmarshalKey(PluginName, &s.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } + // set the logger s.log = log - return nil -} - -func (s *Plugin) Serve() chan error { - const op = errors.Op("boltdb serve") - errCh := make(chan error, 1) - db, err := bolt.Open(path.Join(s.cfg.Dir, s.cfg.File), os.FileMode(s.cfg.Permissions), nil) if err != nil { - errCh <- errors.E(op, err) - return errCh + return errors.E(op, err) } // create bucket if it does not exist @@ -125,24 +69,29 @@ func (s *Plugin) Serve() chan error { } return nil }) + if err != nil { - errCh <- err - return errCh + return errors.E(op, err) } s.DB = db s.bucket = []byte(s.cfg.Bucket) s.stop = make(chan struct{}) - s.timeout = time.Duration(s.cfg.TTL) * time.Second + s.timeout = time.Duration(s.cfg.Interval) * time.Second s.gc = &sync.Map{} + return nil +} + +func (s *Plugin) Serve() chan error { + errCh := make(chan error, 1) // start the TTL gc go s.gcPhase() return errCh } -func (s Plugin) Stop() error { +func (s *Plugin) Stop() error { const op = errors.Op("boltdb stop") err := s.Close() if err != nil { @@ -151,8 +100,9 @@ func (s Plugin) Stop() error { return nil } -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("boltdb Has") + s.log.Debug("boltdb HAS method called", "args", keys) if keys == nil { return nil, errors.E(op, errors.NoKeys) } @@ -164,8 +114,8 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return errors.E(op, errors.EmptyKey) } @@ -173,24 +123,25 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error if b == nil { return errors.E(op, errors.NoSuchBucket) } - exist := b.Get([]byte(key)) + exist := b.Get([]byte(keys[i])) if exist != nil { - m[key] = true + m[keys[i]] = true } } return nil }) if err != nil { - return nil, err + return nil, errors.E(op, err) } + s.log.Debug("boltdb HAS method finished") return m, nil } // Get retrieves the value for a key in the bucket. // Returns a nil value if the key does not exist or if the key is a nested bucket. // The returned value is only valid for the life of the transaction. -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s *Plugin) Get(key string) ([]byte, error) { const op = errors.Op("boltdb Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -211,7 +162,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { buf := bytes.NewReader(val) decoder := gob.NewDecoder(buf) - i := kv.Item{} + var i string err := decoder.Decode(&i) if err != nil { // unsafe (w/o runes) convert @@ -219,7 +170,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { } // set the value - val = []byte(i.Value) + val = []byte(i) } return nil }) @@ -230,7 +181,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { return val, nil } -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb MGet") // defence if keys == nil { @@ -238,8 +189,8 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // should not be empty keys - for _, key := range keys { - keyTrimmed := strings.TrimSpace(key) + for i := range keys { + keyTrimmed := strings.TrimSpace(keys[i]) if keyTrimmed == "" { return nil, errors.E(op, errors.EmptyKey) } @@ -253,10 +204,22 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ return errors.E(op, errors.NoSuchBucket) } - for _, key := range keys { - value := b.Get([]byte(key)) + buf := new(bytes.Buffer) + var out string + buf.Grow(100) + for i := range keys { + value := b.Get([]byte(keys[i])) + buf.Write(value) + // allocate enough space + dec := gob.NewDecoder(buf) if value != nil { - m[key] = value + err := dec.Decode(&out) + if err != nil { + return errors.E(op, err) + } + m[keys[i]] = out + buf.Reset() + out = "" } } @@ -270,7 +233,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ } // Set puts the K/V to the bolt -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) Set(items ...kv.Item) error { const op = errors.Op("boltdb Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -303,7 +266,8 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { return errors.E(op, errors.EmptyItem) } - err = encoder.Encode(&items[i]) + // Encode value + err = encoder.Encode(&items[i].Value) if err != nil { return errors.E(op, err) } @@ -321,6 +285,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { if err != nil { return errors.E(op, err) } + // Store key TTL in the separate map s.gc.Store(items[i].Key, items[i].TTL) } @@ -331,7 +296,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { } // Delete all keys from DB -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s *Plugin) Delete(keys ...string) error { const op = errors.Op("boltdb Delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -378,7 +343,7 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("boltdb MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -396,7 +361,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { return nil } -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("boltdb TTL") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -422,15 +387,25 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{} } // Close the DB connection -func (s Plugin) Close() error { +func (s *Plugin) Close() error { // stop the keys GC s.stop <- struct{}{} return s.DB.Close() } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin name +func (s *Plugin) Name() string { + return PluginName +} + // ========================= PRIVATE ================================= -func (s Plugin) gcPhase() { +func (s *Plugin) gcPhase() { t := time.NewTicker(s.timeout) defer t.Stop() for { @@ -449,6 +424,7 @@ func (s Plugin) gcPhase() { if now.After(v) { // time expired s.gc.Delete(k) + s.log.Debug("key deleted", "key", k) err := s.DB.Update(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucket) if b == nil { diff --git a/plugins/kv/boltdb/plugin_unit_test.go b/plugins/kv/boltdb/plugin_unit_test.go index a12c830d..2459e493 100644 --- a/plugins/kv/boltdb/plugin_unit_test.go +++ b/plugins/kv/boltdb/plugin_unit_test.go @@ -1,19 +1,74 @@ package boltdb import ( - "context" "os" "strconv" "sync" "testing" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/stretchr/testify/assert" + bolt "go.etcd.io/bbolt" + "go.uber.org/zap" ) +// NewBoltClient instantiate new BOLTDB client +// The parameters are: +// path string -- path to database file (can be placed anywhere), if file is not exist, it will be created +// perm os.FileMode -- file permissions, for example 0777 +// options *bolt.Options -- boltDB options, such as timeouts, noGrows options and other +// bucket string -- name of the bucket to use, should be UTF-8 +func newBoltClient(path string, perm os.FileMode, options *bolt.Options, bucket string, ttl time.Duration) (kv.Storage, error) { + const op = errors.Op("newBoltClient") + db, err := bolt.Open(path, perm, options) + if err != nil { + return nil, errors.E(op, err) + } + + // bucket should be SET + if bucket == "" { + return nil, errors.E(op, errors.Str("bucket should be set")) + } + + // create bucket if it does not exist + // tx.Commit invokes via the db.Update + err = db.Update(func(tx *bolt.Tx) error { + _, err = tx.CreateBucketIfNotExists([]byte(bucket)) + if err != nil { + return errors.E(op, err) + } + return nil + }) + if err != nil { + return nil, errors.E(op, err) + } + + // if TTL is not set, make it default + if ttl == 0 { + ttl = time.Minute + } + + l, _ := zap.NewDevelopment() + s := &Plugin{ + DB: db, + bucket: []byte(bucket), + stop: make(chan struct{}), + timeout: ttl, + gc: &sync.Map{}, + log: logger.NewZapAdapter(l), + } + + // start the TTL gc + go s.gcPhase() + + return s, nil +} + func initStorage() kv.Storage { - storage, err := NewBoltClient("rr.db", 0777, nil, "rr", time.Second) + storage, err := newBoltClient("rr.db", 0777, nil, "rr", time.Second) if err != nil { panic(err) } @@ -36,7 +91,7 @@ func TestStorage_Has(t *testing.T) { cleanup(t, "rr.db") }() - v, err := s.Has(context.Background(), "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -50,13 +105,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -66,7 +120,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -82,13 +136,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -98,7 +151,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -115,7 +168,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -133,7 +186,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -146,7 +199,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } @@ -164,13 +217,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -180,13 +232,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -200,13 +252,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -216,13 +267,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) - // no such key + assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "hello world" { @@ -239,13 +290,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -255,20 +305,20 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) + assert.NoError(t, s.Delete("key", "key2")) // check that keys are not present - res, err = s.MGet(ctx, "key", "key2") + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } @@ -281,13 +331,12 @@ func TestStorage_Set_GetM(t *testing.T) { } cleanup(t, "rr.db") }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -297,14 +346,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -313,61 +361,60 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", })) - assert.Error(t, s.Set(ctx, kv.Item{ + assert.Error(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "asdf", })) - _, err = s.Has(ctx, "key") + _, err = s.Has("key") assert.NoError(t, err) - assert.Error(t, s.Set(ctx, kv.Item{})) + assert.Error(t, s.Set(kv.Item{})) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -376,12 +423,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -404,12 +451,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -417,7 +464,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -426,12 +472,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -445,7 +491,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -457,7 +503,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { })) time.Sleep(time.Second * 2) - m, err := s.TTL(ctx, "key", "key2") + m, err := s.TTL("key", "key2") assert.NoError(t, err) // remove a precision 4.02342342 -> 4 @@ -478,7 +524,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 4) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 3512fd73..c1367cdf 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,10 +1,6 @@ package kv // Item represents general storage item -import ( - "context" -) - type Item struct { // Key of item Key string @@ -17,28 +13,28 @@ type Item struct { // Storage represents single abstract storage. type Storage interface { // Has checks if value exists. - Has(ctx context.Context, keys ...string) (map[string]bool, error) + Has(keys ...string) (map[string]bool, error) // Get loads value content into a byte slice. - Get(ctx context.Context, key string) ([]byte, error) + Get(key string) ([]byte, error) // MGet loads content of multiple values - // If there are no values for keys, key will no be in the map - MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) + // Returns the map with existing keys and associated values + MGet(keys ...string) (map[string]interface{}, error) // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(ctx context.Context, items ...Item) error + Set(items ...Item) error // MExpire sets the TTL for multiply keys - MExpire(ctx context.Context, items ...Item) error + MExpire(items ...Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb - TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) + TTL(keys ...string) (map[string]interface{}, error) // Delete one or multiple keys. - Delete(ctx context.Context, keys ...string) error + Delete(keys ...string) error // Close closes the storage and underlying resources. Close() error diff --git a/plugins/kv/memcached/plugin.go b/plugins/kv/memcached/plugin.go index 69f96bfe..f5111c04 100644 --- a/plugins/kv/memcached/plugin.go +++ b/plugins/kv/memcached/plugin.go @@ -1,7 +1,6 @@ package memcached import ( - "context" "strings" "time" @@ -58,8 +57,18 @@ func (s *Plugin) Stop() error { return nil } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + // Has checks the key for existence -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("memcached Has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -84,7 +93,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error // Get gets the item for the given key. ErrCacheMiss is returned for a // memcache cache miss. The key must be at most 250 bytes in length. -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s Plugin) Get(key string) ([]byte, error) { const op = errors.Op("memcached Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -106,7 +115,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { // return map with key -- string // and map value as value -- []byte -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached MGet") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -141,7 +150,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s Plugin) Set(items ...kv.Item) error { const op = errors.Op("memcached Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -182,7 +191,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("memcached MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -209,12 +218,12 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { } // return time in seconds (int32) for a given keys -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("memcached HTTLas") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s Plugin) Delete(keys ...string) error { const op = errors.Op("memcached Has") if keys == nil { return errors.E(op, errors.NoKeys) diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/storage_test.go index 4b59bbd0..3d37748b 100644 --- a/plugins/kv/memcached/storage_test.go +++ b/plugins/kv/memcached/storage_test.go @@ -1,7 +1,6 @@ package memcached import ( - "context" "strconv" "sync" "testing" @@ -16,7 +15,7 @@ func initStorage() kv.Storage { } func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(context.Background(), keys...) + err := s.Delete(keys...) if err != nil { t.Fatalf("error during cleanup: %s", err.Error()) } @@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) { func TestStorage_Has(t *testing.T) { s := initStorage() - ctx := context.Background() - - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -57,7 +53,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -73,13 +69,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -89,13 +84,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -109,13 +104,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -125,13 +119,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "hello world" { @@ -148,13 +142,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -164,27 +157,26 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) + assert.NoError(t, s.Delete("key", "key2")) // check that keys are not present - res, err = s.MGet(ctx, "key", "key2") + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } func TestStorage_Set_GetM(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -194,11 +186,11 @@ func TestStorage_Set_GetM(t *testing.T) { } }() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -208,14 +200,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -224,12 +215,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -252,12 +243,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -265,7 +256,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key") if err := s.Close(); err != nil { @@ -274,46 +264,45 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.Error(t, s.Set(ctx, kv.Item{})) + assert.Error(t, s.Set(kv.Item{})) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -322,12 +311,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -341,7 +330,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -355,7 +344,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -370,13 +359,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -386,7 +374,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -403,7 +391,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -421,7 +409,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -434,7 +422,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } diff --git a/plugins/kv/memory/storage.go b/plugins/kv/memory/plugin.go index f4bdacea..2c65f14c 100644 --- a/plugins/kv/memory/storage.go +++ b/plugins/kv/memory/plugin.go @@ -1,7 +1,6 @@ package memory import ( - "context" "strings" "sync" "time" @@ -49,7 +48,7 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (s Plugin) Serve() chan error { +func (s *Plugin) Serve() chan error { errCh := make(chan error, 1) // start in-memory gc for kv go s.gc() @@ -57,7 +56,7 @@ func (s Plugin) Serve() chan error { return errCh } -func (s Plugin) Stop() error { +func (s *Plugin) Stop() error { const op = errors.Op("in-memory storage stop") err := s.Close() if err != nil { @@ -66,7 +65,7 @@ func (s Plugin) Stop() error { return nil } -func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error) { +func (s *Plugin) Has(keys ...string) (map[string]bool, error) { const op = errors.Op("in-memory storage Has") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -86,7 +85,7 @@ func (s Plugin) Has(ctx context.Context, keys ...string) (map[string]bool, error return m, nil } -func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { +func (s *Plugin) Get(key string) ([]byte, error) { const op = errors.Op("in-memory storage Get") // to get cases like " " keyTrimmed := strings.TrimSpace(key) @@ -102,7 +101,7 @@ func (s Plugin) Get(ctx context.Context, key string) ([]byte, error) { return nil, nil } -func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) MGet(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in-memory storage MGet") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -127,7 +126,7 @@ func (s Plugin) MGet(ctx context.Context, keys ...string) (map[string]interface{ return m, nil } -func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) Set(items ...kv.Item) error { const op = errors.Op("in-memory storage Set") if items == nil { return errors.E(op, errors.NoKeys) @@ -150,7 +149,7 @@ func (s Plugin) Set(ctx context.Context, items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { +func (s *Plugin) MExpire(items ...kv.Item) error { const op = errors.Op("in-memory storage MExpire") for i := range items { if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { @@ -179,7 +178,7 @@ func (s Plugin) MExpire(ctx context.Context, items ...kv.Item) error { return nil } -func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{}, error) { +func (s *Plugin) TTL(keys ...string) (map[string]interface{}, error) { const op = errors.Op("in-memory storage TTL") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -203,7 +202,7 @@ func (s Plugin) TTL(ctx context.Context, keys ...string) (map[string]interface{} return m, nil } -func (s Plugin) Delete(ctx context.Context, keys ...string) error { +func (s *Plugin) Delete(keys ...string) error { const op = errors.Op("in-memory storage Delete") if keys == nil { return errors.E(op, errors.NoKeys) @@ -224,11 +223,21 @@ func (s Plugin) Delete(ctx context.Context, keys ...string) error { } // Close clears the in-memory storage -func (s Plugin) Close() error { +func (s *Plugin) Close() error { s.stop <- struct{}{} return nil } +// RPCService returns associated rpc service. +func (s *Plugin) RPC() interface{} { + return kv.NewRPCServer(s, s.log) +} + +// Name returns plugin user-friendly name +func (s *Plugin) Name() string { + return PluginName +} + // ================================== PRIVATE ====================================== func (s *Plugin) gc() { diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/storage_test.go index b7b46637..4b30460d 100644 --- a/plugins/kv/memory/storage_test.go +++ b/plugins/kv/memory/storage_test.go @@ -1,7 +1,6 @@ package memory import ( - "context" "strconv" "sync" "testing" @@ -16,7 +15,7 @@ func initStorage() kv.Storage { } func cleanup(t *testing.T, s kv.Storage, keys ...string) { - err := s.Delete(context.Background(), keys...) + err := s.Delete(keys...) if err != nil { t.Fatalf("error during cleanup: %s", err.Error()) } @@ -25,9 +24,7 @@ func cleanup(t *testing.T, s kv.Storage, keys ...string) { func TestStorage_Has(t *testing.T) { s := initStorage() - ctx := context.Background() - - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) } @@ -41,13 +38,12 @@ func TestStorage_Has_Set_Has(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -58,7 +54,7 @@ func TestStorage_Has_Set_Has(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -74,13 +70,12 @@ func TestStorage_Has_Set_MGet(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -91,13 +86,13 @@ func TestStorage_Has_Set_MGet(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } @@ -111,13 +106,12 @@ func TestStorage_Has_Set_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -128,13 +122,13 @@ func TestStorage_Has_Set_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) - res, err := s.Get(ctx, "key") + res, err := s.Get("key") assert.NoError(t, err) if string(res) != "value" { @@ -151,13 +145,12 @@ func TestStorage_Set_Del_Get(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -168,27 +161,26 @@ func TestStorage_Set_Del_Get(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) assert.True(t, v["key2"]) // check that keys are present - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) - assert.NoError(t, s.Delete(ctx, "key", "key2")) - // check that keys are not presentps -eo state,uid,pid,ppid,rtprio,time,comm - res, err = s.MGet(ctx, "key", "key2") + assert.NoError(t, s.Delete("key", "key2")) + // check that keys are not presents -eo state,uid,pid,ppid,rtprio,time,comm + res, err = s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 0) } func TestStorage_Set_GetM(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -198,11 +190,11 @@ func TestStorage_Set_GetM(t *testing.T) { } }() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: "", @@ -213,14 +205,13 @@ func TestStorage_Set_GetM(t *testing.T) { TTL: "", })) - res, err := s.MGet(ctx, "key", "key2") + res, err := s.MGet("key", "key2") assert.NoError(t, err) assert.Len(t, res, 2) } func TestStorage_MExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") @@ -230,12 +221,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -258,12 +249,12 @@ func TestStorage_MExpire_TTL(t *testing.T) { Value: "", TTL: nowPlusFive, } - assert.NoError(t, s.MExpire(ctx, i1, i2)) + assert.NoError(t, s.MExpire(i1, i2)) time.Sleep(time.Second * 6) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -271,7 +262,6 @@ func TestStorage_MExpire_TTL(t *testing.T) { func TestNilAndWrongArgs(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { if err := s.Close(); err != nil { panic(err) @@ -279,48 +269,47 @@ func TestNilAndWrongArgs(t *testing.T) { }() // check - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) assert.False(t, v["key"]) - _, err = s.Has(ctx, "") + _, err = s.Has("") assert.Error(t, err) - _, err = s.Get(ctx, "") + _, err = s.Get("") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.Get(ctx, " ") + _, err = s.Get(" ") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", "") + _, err = s.MGet("key", "key2", "") assert.Error(t, err) - _, err = s.MGet(ctx, "key", "key2", " ") + _, err = s.MGet("key", "key2", " ") assert.Error(t, err) - assert.NoError(t, s.Set(ctx, kv.Item{})) - _, err = s.Has(ctx, "key") + assert.NoError(t, s.Set(kv.Item{})) + _, err = s.Has("key") assert.NoError(t, err) - err = s.Delete(ctx, "") + err = s.Delete("") assert.Error(t, err) - err = s.Delete(ctx, "key", "") + err = s.Delete("key", "") assert.Error(t, err) - err = s.Delete(ctx, "key", " ") + err = s.Delete("key", " ") assert.Error(t, err) - err = s.Delete(ctx, "key") + err = s.Delete("key") assert.NoError(t, err) } func TestStorage_SetExpire_TTL(t *testing.T) { s := initStorage() - ctx := context.Background() defer func() { cleanup(t, s, "key", "key2") if err := s.Close(); err != nil { @@ -329,12 +318,12 @@ func TestStorage_SetExpire_TTL(t *testing.T) { }() // ensure that storage is clean - v, err := s.Has(ctx, "key", "key2") + v, err := s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -348,7 +337,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { nowPlusFive := time.Now().Add(time.Second * 5).Format(time.RFC3339) // set timeout to 5 sec - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "value", TTL: nowPlusFive, @@ -360,7 +349,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { })) time.Sleep(time.Second * 2) - m, err := s.TTL(ctx, "key", "key2") + m, err := s.TTL("key", "key2") assert.NoError(t, err) // remove a precision 4.02342342 -> 4 @@ -381,7 +370,7 @@ func TestStorage_SetExpire_TTL(t *testing.T) { time.Sleep(time.Second * 4) // ensure that storage is clean - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) assert.False(t, v["key"]) assert.False(t, v["key2"]) @@ -396,13 +385,12 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { } }() - ctx := context.Background() - v, err := s.Has(ctx, "key") + v, err := s.Has("key") assert.NoError(t, err) // no such key assert.False(t, v["key"]) - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key", Value: "hello world", TTL: "", @@ -412,7 +400,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { TTL: "", })) - v, err = s.Has(ctx, "key", "key2") + v, err = s.Has("key", "key2") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -429,7 +417,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { m.Lock() // set is writable transaction // it should stop readable - assert.NoError(t, s.Set(ctx, kv.Item{ + assert.NoError(t, s.Set(kv.Item{ Key: "key" + strconv.Itoa(i), Value: "hello world" + strconv.Itoa(i), TTL: "", @@ -447,7 +435,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.RLock() - v, err = s.Has(ctx, "key") + v, err = s.Has("key") assert.NoError(t, err) // no such key assert.True(t, v["key"]) @@ -460,7 +448,7 @@ func TestConcurrentReadWriteTransactions(t *testing.T) { defer wg.Done() for i := 0; i <= 1000; i++ { m.Lock() - err = s.Delete(ctx, "key"+strconv.Itoa(i)) + err = s.Delete("key" + strconv.Itoa(i)) assert.NoError(t, err) m.Unlock() } diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go new file mode 100644 index 00000000..751f0d12 --- /dev/null +++ b/plugins/kv/rpc.go @@ -0,0 +1,110 @@ +package kv + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Wrapper for the plugin +type RPCServer struct { + // svc is a plugin implementing Storage interface + svc Storage + // Logger + log logger.Logger +} + +// NewRPCServer construct RPC server for the particular plugin +func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { + return &RPCServer{ + svc: srv, + log: log, + } +} + +// data Data +func (r *RPCServer) Has(in []string, res *map[string]bool) error { + const op = errors.Op("rpc server Has") + ret, err := r.svc.Has(in...) + if err != nil { + return errors.E(op, err) + } + + // update the value in the pointer + *res = ret + return nil +} + +// in SetData +func (r *RPCServer) Set(in []Item, ok *bool) error { + const op = errors.Op("rpc server Set") + + err := r.svc.Set(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server MGet") + ret, err := r.svc.MGet(in...) + if err != nil { + return errors.E(op, err) + } + + // update return value + *res = ret + return nil +} + +// in Data +func (r *RPCServer) MExpire(in []Item, ok *bool) error { + const op = errors.Op("rpc server MExpire") + + err := r.svc.MExpire(in...) + if err != nil { + return errors.E(op, err) + } + + *ok = true + return nil +} + +// in Data +func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { + const op = errors.Op("rpc server TTL") + + ret, err := r.svc.TTL(in...) + if err != nil { + return errors.E(op, err) + } + + *res = ret + return nil +} + +// in Data +func (r *RPCServer) Delete(in []string, ok *bool) error { + const op = errors.Op("rpc server Delete") + err := r.svc.Delete(in...) + if err != nil { + return errors.E(op, err) + } + *ok = true + return nil +} + +// in string, storages +func (r *RPCServer) Close(storage string, ok *bool) error { + const op = errors.Op("rpc server Close") + err := r.svc.Close() + if err != nil { + return errors.E(op, err) + } + *ok = true + + return nil +} |