diff options
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r-- | plugins/kv/rpc.go | 153 |
1 files changed, 46 insertions, 107 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 2d4babbe..a9efe0c4 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,9 +2,8 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) // Wrapper for the plugin @@ -17,23 +16,21 @@ type rpc struct { log logger.Logger } -// Has accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Has(in []byte, res *map[string]bool) error { +// Has accept []*payload.Payload proto payload with Storage and Item +func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) + if in.Storage == "" { + return errors.E(op, errors.Str("no storage provided")) + } - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, utils.AsString(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok { + if st, ok := r.storages[in.Storage]; ok { ret, err := st.Has(keys...) if err != nil { return err @@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Set accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Set(in []byte, ok *bool) error { +// Set accept proto payload with Storage and Item +func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_set") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - items := make([]Item, 0, dataRoot.ItemsLength()) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - Value: string(tmpItem.Value()), - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.Set(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.Set(in.GetItems()...) if err != nil { return err } @@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MGet accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { +// MGet accept proto payload with Storage and Item +func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} + keys := make([]string, 0, len(in.GetItems())) - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.MGet(keys...) if err != nil { return err @@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MExpire accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MExpire(in []byte, ok *bool) error { +// MExpire accept proto payload with Storage and Item +func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - // when unmarshalling the keys, simultaneously, fill up the slice with items - items := make([]Item, 0, l) - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - // we set up timeout on the keys, so, value here is redundant - Value: "", - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.MExpire(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.MExpire(in.GetItems()...) if err != nil { return errors.E(op, err) } @@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// TTL accept []byte flatbuffers payload with Storage and Item -func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { +// TTL accept proto payload with Storage and Item +func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.TTL(keys...) if err != nil { return err @@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Delete accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Delete(in []byte, ok *bool) error { +// Delete accept proto payload with Storage and Item +func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rcp_delete") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { err := st.Delete(keys...) if err != nil { return errors.E(op, err) @@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } |