summaryrefslogtreecommitdiff
path: root/plugins/kv/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-22 14:41:50 +0300
committerValery Piashchynski <[email protected]>2021-04-22 14:41:50 +0300
commitabf606afd6fd9fbb0fd374ab5da41a8ee8d5a15d (patch)
treeb0cfdeb5c478463d853b8a7409e88cfa7219c3fa /plugins/kv/rpc.go
parente4d65a41ec90747a387cfe769f743327959f7105 (diff)
- Implement tests for the KV
- Implement Storage interface for the boltdb,memory,memcached drivers Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r--plugins/kv/rpc.go209
1 files changed, 147 insertions, 62 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 69b91981..cc3875e0 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -49,13 +49,13 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-//Set accept []byte flatbuffers payload with Storage and Item
+// Set accept []byte flatbuffers payload with Storage and Item
func (r *rpc) Set(in []byte, ok *bool) error {
const op = errors.Op("rpc_set")
dataRoot := generated.GetRootAsPayload(in, 0)
-
l := dataRoot.ItemsLength()
+
items := make([]Item, 0, dataRoot.ItemsLength())
tmpItem := &generated.Item{}
@@ -84,70 +84,155 @@ func (r *rpc) Set(in []byte, ok *bool) error {
return nil
}
+ *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
// MGet accept []byte flatbuffers payload with Storage and Item
-//func (r *rpc) MGet(in []string, res *map[string]interface{}) error {
-// const op = errors.Op("rpc server MGet")
-// ret, err := r.svc.MGet(in...)
-// if err != nil {
-// return errors.E(op, err)
-// }
-//
-// // update return value
-// *res = ret
-// return nil
-//}
-
-//// MExpire accept []byte flatbuffers payload with Storage and Item
-//func (r *rpc) MExpire(in []Item, ok *bool) error {
-// const op = errors.Op("rpc server MExpire")
-//
-// err := r.svc.MExpire(in...)
-// if err != nil {
-// return errors.E(op, err)
-// }
-//
-// *ok = true
-// return nil
-//}
-//
-//// TTL accept []byte flatbuffers payload with Storage and Item
-//func (r *rpc) TTL(in []string, res *map[string]interface{}) error {
-// const op = errors.Op("rpc server TTL")
-//
-// ret, err := r.svc.TTL(in...)
-// if err != nil {
-// return errors.E(op, err)
-// }
-//
-// *res = ret
-// return nil
-//}
-//
-//// Delete accept []byte flatbuffers payload with Storage and Item
-//func (r *rpc) Delete(in []string, ok *bool) error {
-// const op = errors.Op("rpc server Delete")
-// err := r.svc.Delete(in...)
-// if err != nil {
-// return errors.E(op, err)
-// }
-// *ok = true
-// return nil
-//}
-//
-//// Close closes the storage connection
-//func (r *rpc) Close(storage string, ok *bool) error {
-// const op = errors.Op("rpc server Close")
-// err := r.svc.Close()
-// if err != nil {
-// return errors.E(op, err)
-// }
-// *ok = true
-//
-// return nil
-//}
+func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_mget")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
+ }
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.MGet(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+// MExpire accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) MExpire(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_mexpire")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+
+ // when unmarshalling the keys, simultaneously, fill up the slice with items
+ items := make([]Item, 0, l)
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ // we set up timeout on the keys, so, value here is redundant
+ Value: "",
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
+ }
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.MExpire(items...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // save the result
+ *ok = true
+ return nil
+ }
+
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+// TTL accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_ttl")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
+ }
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.TTL(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+// Delete accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Delete(in []byte, ok *bool) error {
+ const op = errors.Op("rcp_delete")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
+ }
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Delete(keys...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // save the result
+ *ok = true
+ return nil
+ }
+
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+// Close closes the storage connection
+func (r *rpc) Close(storage string, ok *bool) error {
+ const op = errors.Op("rpc_close")
+ if st, exists := r.storages[storage]; exists {
+ err := st.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // save the result
+ *ok = true
+ return nil
+ }
+
+ *ok = false
+ return nil
+}
func strConvert(s []byte) string {
return *(*string)(unsafe.Pointer(&s))