summaryrefslogtreecommitdiff
path: root/plugins/kv/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-04-22 00:48:35 +0300
committerValery Piashchynski <[email protected]>2021-04-22 00:48:35 +0300
commite4d65a41ec90747a387cfe769f743327959f7105 (patch)
tree2b5245fa0a86197d4699fc840c658ffa86cd957b /plugins/kv/rpc.go
parente1e168da92e0dca0e067e08ecb4cf264b9344d45 (diff)
- General interface, update RPC and Has/Set methods
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r--plugins/kv/rpc.go198
1 files changed, 121 insertions, 77 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 751f0d12..69b91981 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -1,110 +1,154 @@
package kv
import (
+ "unsafe"
+
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
// Wrapper for the plugin
-type RPCServer struct {
+type rpc struct {
+ // all available storages
+ storages map[string]Storage
// svc is a plugin implementing Storage interface
- svc Storage
+ srv *Plugin
// Logger
log logger.Logger
}
-// NewRPCServer construct RPC server for the particular plugin
-func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
- return &RPCServer{
- svc: srv,
- log: log,
- }
-}
-
-// data Data
-func (r *RPCServer) Has(in []string, res *map[string]bool) error {
- const op = errors.Op("rpc server Has")
- ret, err := r.svc.Has(in...)
- if err != nil {
- return errors.E(op, err)
- }
-
- // update the value in the pointer
- *res = ret
- return nil
-}
+// Has accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Has(in []byte, res *map[string]bool) error {
+ const op = errors.Op("rpc_has")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
-// in SetData
-func (r *RPCServer) Set(in []Item, ok *bool) error {
- const op = errors.Op("rpc server Set")
+ keys := make([]string, 0, l)
- err := r.svc.Set(in...)
- if err != nil {
- return errors.E(op, err)
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, strConvert(tmpItem.Key()))
}
- *ok = true
- return nil
-}
+ if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok {
+ ret, err := st.Has(keys...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server MGet")
- ret, err := r.svc.MGet(in...)
- if err != nil {
- return errors.E(op, err)
+ // update the value in the pointer
+ // save the result
+ *res = ret
+ return nil
}
- // update return value
- *res = ret
- return nil
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) MExpire(in []Item, ok *bool) error {
- const op = errors.Op("rpc server MExpire")
+//Set accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Set(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_set")
- err := r.svc.MExpire(in...)
- if err != nil {
- return errors.E(op, err)
- }
+ dataRoot := generated.GetRootAsPayload(in, 0)
- *ok = true
- return nil
-}
+ l := dataRoot.ItemsLength()
+ items := make([]Item, 0, dataRoot.ItemsLength())
+ tmpItem := &generated.Item{}
-// in Data
-func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server TTL")
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
- ret, err := r.svc.TTL(in...)
- if err != nil {
- return errors.E(op, err)
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ Value: string(tmpItem.Value()),
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
}
- *res = ret
- return nil
-}
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Set(items...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) Delete(in []string, ok *bool) error {
- const op = errors.Op("rpc server Delete")
- err := r.svc.Delete(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- *ok = true
- return nil
-}
-// in string, storages
-func (r *RPCServer) Close(storage string, ok *bool) error {
- const op = errors.Op("rpc server Close")
- err := r.svc.Close()
- if err != nil {
- return errors.E(op, err)
- }
- *ok = true
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
- return nil
+// MGet accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) MGet(in []string, res *map[string]interface{}) error {
+// const op = errors.Op("rpc server MGet")
+// ret, err := r.svc.MGet(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// // update return value
+// *res = ret
+// return nil
+//}
+
+//// MExpire accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) MExpire(in []Item, ok *bool) error {
+// const op = errors.Op("rpc server MExpire")
+//
+// err := r.svc.MExpire(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// *ok = true
+// return nil
+//}
+//
+//// TTL accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) TTL(in []string, res *map[string]interface{}) error {
+// const op = errors.Op("rpc server TTL")
+//
+// ret, err := r.svc.TTL(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+//
+// *res = ret
+// return nil
+//}
+//
+//// Delete accept []byte flatbuffers payload with Storage and Item
+//func (r *rpc) Delete(in []string, ok *bool) error {
+// const op = errors.Op("rpc server Delete")
+// err := r.svc.Delete(in...)
+// if err != nil {
+// return errors.E(op, err)
+// }
+// *ok = true
+// return nil
+//}
+//
+//// Close closes the storage connection
+//func (r *rpc) Close(storage string, ok *bool) error {
+// const op = errors.Op("rpc server Close")
+// err := r.svc.Close()
+// if err != nil {
+// return errors.E(op, err)
+// }
+// *ok = true
+//
+// return nil
+//}
+
+func strConvert(s []byte) string {
+ return *(*string)(unsafe.Pointer(&s))
}