summaryrefslogtreecommitdiff
path: root/plugins/kv/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r--plugins/kv/rpc.go245
1 files changed, 178 insertions, 67 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 751f0d12..4947dbe3 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -1,110 +1,221 @@
package kv
import (
+ "unsafe"
+
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
// Wrapper for the plugin
-type RPCServer struct {
+type rpc struct {
+ // all available storages
+ storages map[string]Storage
// svc is a plugin implementing Storage interface
- svc Storage
+ srv *Plugin
// Logger
log logger.Logger
}
-// NewRPCServer construct RPC server for the particular plugin
-func NewRPCServer(srv Storage, log logger.Logger) *RPCServer {
- return &RPCServer{
- svc: srv,
- log: log,
+// Has accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Has(in []byte, res *map[string]bool) error {
+ const op = errors.Op("rpc_has")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+
+ keys := make([]string, 0, l)
+
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, strConvert(tmpItem.Key()))
}
-}
-// data Data
-func (r *RPCServer) Has(in []string, res *map[string]bool) error {
- const op = errors.Op("rpc server Has")
- ret, err := r.svc.Has(in...)
- if err != nil {
- return errors.E(op, err)
+ if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok {
+ ret, err := st.Has(keys...)
+ if err != nil {
+ return err
+ }
+
+ // update the value in the pointer
+ // save the result
+ *res = ret
+ return nil
}
- // update the value in the pointer
- *res = ret
- return nil
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in SetData
-func (r *RPCServer) Set(in []Item, ok *bool) error {
- const op = errors.Op("rpc server Set")
+// Set accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Set(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_set")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
- err := r.svc.Set(in...)
- if err != nil {
- return errors.E(op, err)
+ items := make([]Item, 0, dataRoot.ItemsLength())
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ Value: string(tmpItem.Value()),
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
}
- *ok = true
- return nil
-}
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Set(items...)
+ if err != nil {
+ return err
+ }
-// in Data
-func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server MGet")
- ret, err := r.svc.MGet(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- // update return value
- *res = ret
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) MExpire(in []Item, ok *bool) error {
- const op = errors.Op("rpc server MExpire")
+// MGet accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_mget")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
- err := r.svc.MExpire(in...)
- if err != nil {
- return errors.E(op, err)
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
}
- *ok = true
- return nil
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.MGet(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error {
- const op = errors.Op("rpc server TTL")
+// MExpire accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) MExpire(in []byte, ok *bool) error {
+ const op = errors.Op("rpc_mexpire")
+
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+
+ // when unmarshalling the keys, simultaneously, fill up the slice with items
+ items := make([]Item, 0, l)
+ tmpItem := &generated.Item{}
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+
+ itc := Item{
+ Key: string(tmpItem.Key()),
+ // we set up timeout on the keys, so, value here is redundant
+ Value: "",
+ TTL: string(tmpItem.Timeout()),
+ }
+
+ items = append(items, itc)
+ }
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.MExpire(items...)
+ if err != nil {
+ return errors.E(op, err)
+ }
- ret, err := r.svc.TTL(in...)
- if err != nil {
- return errors.E(op, err)
+ // save the result
+ *ok = true
+ return nil
}
- *res = ret
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in Data
-func (r *RPCServer) Delete(in []string, ok *bool) error {
- const op = errors.Op("rpc server Delete")
- err := r.svc.Delete(in...)
- if err != nil {
- return errors.E(op, err)
+// TTL accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+ const op = errors.Op("rpc_ttl")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
}
- *ok = true
- return nil
+
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ ret, err := st.TTL(keys...)
+ if err != nil {
+ return err
+ }
+
+ // save the result
+ *res = ret
+ return nil
+ }
+
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
}
-// in string, storages
-func (r *RPCServer) Close(storage string, ok *bool) error {
- const op = errors.Op("rpc server Close")
- err := r.svc.Close()
- if err != nil {
- return errors.E(op, err)
+// Delete accept []byte flatbuffers payload with Storage and Item
+func (r *rpc) Delete(in []byte, ok *bool) error {
+ const op = errors.Op("rcp_delete")
+ dataRoot := generated.GetRootAsPayload(in, 0)
+ l := dataRoot.ItemsLength()
+ keys := make([]string, 0, l)
+ tmpItem := &generated.Item{}
+
+ for i := 0; i < l; i++ {
+ if !dataRoot.Items(tmpItem, i) {
+ continue
+ }
+ keys = append(keys, string(tmpItem.Key()))
+ }
+ if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists {
+ err := st.Delete(keys...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // save the result
+ *ok = true
+ return nil
}
- *ok = true
- return nil
+ *ok = false
+ return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+}
+
+func strConvert(s []byte) string {
+ return *(*string)(unsafe.Pointer(&s))
}