diff options
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r-- | plugins/kv/rpc.go | 245 |
1 files changed, 178 insertions, 67 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 751f0d12..4947dbe3 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -1,110 +1,221 @@ package kv import ( + "unsafe" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" ) // Wrapper for the plugin -type RPCServer struct { +type rpc struct { + // all available storages + storages map[string]Storage // svc is a plugin implementing Storage interface - svc Storage + srv *Plugin // Logger log logger.Logger } -// NewRPCServer construct RPC server for the particular plugin -func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { - return &RPCServer{ - svc: srv, - log: log, +// Has accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Has(in []byte, res *map[string]bool) error { + const op = errors.Op("rpc_has") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + keys := make([]string, 0, l) + + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, strConvert(tmpItem.Key())) } -} -// data Data -func (r *RPCServer) Has(in []string, res *map[string]bool) error { - const op = errors.Op("rpc server Has") - ret, err := r.svc.Has(in...) - if err != nil { - return errors.E(op, err) + if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok { + ret, err := st.Has(keys...) + if err != nil { + return err + } + + // update the value in the pointer + // save the result + *res = ret + return nil } - // update the value in the pointer - *res = ret - return nil + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in SetData -func (r *RPCServer) Set(in []Item, ok *bool) error { - const op = errors.Op("rpc server Set") +// Set accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Set(in []byte, ok *bool) error { + const op = errors.Op("rpc_set") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() - err := r.svc.Set(in...) - if err != nil { - return errors.E(op, err) + items := make([]Item, 0, dataRoot.ItemsLength()) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + Value: string(tmpItem.Value()), + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) } - *ok = true - return nil -} + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Set(items...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server MGet") - ret, err := r.svc.MGet(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - // update return value - *res = ret - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) MExpire(in []Item, ok *bool) error { - const op = errors.Op("rpc server MExpire") +// MGet accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} - err := r.svc.MExpire(in...) - if err != nil { - return errors.E(op, err) + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) } - *ok = true - return nil + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server TTL") +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } - ret, err := r.svc.TTL(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - *res = ret - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) Delete(in []string, ok *bool) error { - const op = errors.Op("rpc server Delete") - err := r.svc.Delete(in...) - if err != nil { - return errors.E(op, err) +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) } - *ok = true - return nil + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in string, storages -func (r *RPCServer) Close(storage string, ok *bool) error { - const op = errors.Op("rpc server Close") - err := r.svc.Close() - if err != nil { - return errors.E(op, err) +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil } - *ok = true - return nil + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +func strConvert(s []byte) string { + return *(*string)(unsafe.Pointer(&s)) } |