summaryrefslogtreecommitdiff
path: root/plugins/kv/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r--plugins/kv/rpc.go153
1 files changed, 46 insertions, 107 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 2d4babbe..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,9 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -17,23 +16,21 @@ type rpc struct {
log logger.Logger
}
-// Has accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Has(in []byte, res *map[string]bool) error {
+// Has accept []*payload.Payload proto payload with Storage and Item
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
+ if in.Storage == "" {
+ return errors.E(op, errors.Str("no storage provided"))
+ }
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, utils.AsString(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[in.Storage]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Set accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Set(in []byte, ok *bool) error {
+// Set accept proto payload with Storage and Item
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- items := make([]Item, 0, dataRoot.ItemsLength())
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- Value: string(tmpItem.Value()),
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.Set(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Set(in.GetItems()...)
if err != nil {
return err
}
@@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MGet accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+// MGet accept proto payload with Storage and Item
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
+ keys := make([]string, 0, len(in.GetItems()))
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MExpire accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MExpire(in []byte, ok *bool) error {
+// MExpire accept proto payload with Storage and Item
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- // when unmarshalling the keys, simultaneously, fill up the slice with items
- items := make([]Item, 0, l)
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- // we set up timeout on the keys, so, value here is redundant
- Value: "",
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.MExpire(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.MExpire(in.GetItems()...)
if err != nil {
return errors.E(op, err)
}
@@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// TTL accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+// TTL accept proto payload with Storage and Item
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Delete accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Delete(in []byte, ok *bool) error {
+// Delete accept proto payload with Storage and Item
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}