summaryrefslogtreecommitdiff
path: root/plugins/kv
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go17
-rw-r--r--plugins/kv/drivers/memcached/driver.go18
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/driver.go43
-rw-r--r--plugins/kv/drivers/redis/driver.go20
-rw-r--r--plugins/kv/interface.go14
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/rpc.go153
10 files changed, 105 insertions, 314 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 0f647cb1..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error {
// if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
// we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
// Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
buf.Reset()
@@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
return nil
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 02281ed5..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,6 +6,7 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
- if items[i] == EmptyItem {
+ if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}
@@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error {
}
// add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index cde84f42..3997e0d4 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -9,8 +9,6 @@ import (
const PluginName = "memcached"
-var EmptyItem = kv.Item{}
-
type Plugin struct {
// config plugin
cfgPlugin config.Configurer
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index c2494ee7..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
+ if items[i] == nil {
+ continue
+ }
// TTL is set
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
+ if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(kv.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
+ s.heap.Store(items[i].Key, &kvv1.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ Timeout: items[i].Timeout,
})
}
}
@@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -209,12 +216,12 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
+ v := value.(*kvv1.Item)
+ if v.Timeout == "" {
return true
}
- t, err := time.Parse(time.RFC3339, v.TTL)
+ t, err := time.Parse(time.RFC3339, v.Timeout)
if err != nil {
return false
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index d0b541b2..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,13 +7,12 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-var EmptyItem = kv.Item{}
-
type Driver struct {
universalClient redis.UniversalClient
log logger.Logger
@@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
now := time.Now()
for _, item := range items {
- if item == EmptyItem {
+ if item == nil {
return errors.E(op, errors.EmptyKey)
}
- if item.TTL == "" {
+ if item.Timeout == "" {
err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
if err != nil {
return err
}
} else {
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
@@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
- if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ if item == nil {
+ continue
+ }
+ if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 20dbb8b3..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,14 +1,6 @@
package kv
-// Item represents general storage item
-type Item struct {
- // Key of item
- Key string
- // Value of item
- Value string
- // live until time provided by TTL in RFC 3339 format
- TTL string
-}
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -24,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
deleted file mode 100644
index 61bd6024..00000000
--- a/plugins/kv/payload/generated/Item.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Item struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Item{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Item) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Item) Key() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Value() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Timeout() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func ItemStart(builder *flatbuffers.Builder) {
- builder.StartObject(3)
-}
-func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
-}
-func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
-}
-func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
-}
-func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
deleted file mode 100644
index a2c6cfdb..00000000
--- a/plugins/kv/payload/generated/Payload.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Payload struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Payload{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Payload) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Payload) Storage() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Payload) Items(obj *Item, j int) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- x := rcv._tab.Vector(o)
- x += flatbuffers.UOffsetT(j) * 4
- x = rcv._tab.Indirect(x)
- obj.Init(rcv._tab.Bytes, x)
- return true
- }
- return false
-}
-
-func (rcv *Payload) ItemsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func PayloadStart(builder *flatbuffers.Builder) {
- builder.StartObject(2)
-}
-func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
-}
-func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
-}
-func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
deleted file mode 100644
index 7e02c1a0..00000000
--- a/plugins/kv/payload/payload.fbs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace generated;
-
-table Payload {
- Storage:string;
- Items:[Item];
-}
-
-table Item {
- Key:string;
- Value:string;
- Timeout:string;
-}
-
-root_type Payload;
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 2d4babbe..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,9 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -17,23 +16,21 @@ type rpc struct {
log logger.Logger
}
-// Has accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Has(in []byte, res *map[string]bool) error {
+// Has accept []*payload.Payload proto payload with Storage and Item
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
+ if in.Storage == "" {
+ return errors.E(op, errors.Str("no storage provided"))
+ }
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, utils.AsString(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[in.Storage]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Set accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Set(in []byte, ok *bool) error {
+// Set accept proto payload with Storage and Item
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- items := make([]Item, 0, dataRoot.ItemsLength())
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- Value: string(tmpItem.Value()),
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.Set(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Set(in.GetItems()...)
if err != nil {
return err
}
@@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MGet accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+// MGet accept proto payload with Storage and Item
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
+ keys := make([]string, 0, len(in.GetItems()))
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MExpire accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MExpire(in []byte, ok *bool) error {
+// MExpire accept proto payload with Storage and Item
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- // when unmarshalling the keys, simultaneously, fill up the slice with items
- items := make([]Item, 0, l)
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- // we set up timeout on the keys, so, value here is redundant
- Value: "",
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.MExpire(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.MExpire(in.GetItems()...)
if err != nil {
return errors.E(op, err)
}
@@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// TTL accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+// TTL accept proto payload with Storage and Item
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Delete accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Delete(in []byte, ok *bool) error {
+// Delete accept proto payload with Storage and Item
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}