diff options
author | Valery Piashchynski <[email protected]> | 2021-04-22 00:48:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-22 00:48:35 +0300 |
commit | e4d65a41ec90747a387cfe769f743327959f7105 (patch) | |
tree | 2b5245fa0a86197d4699fc840c658ffa86cd957b /plugins/kv/rpc.go | |
parent | e1e168da92e0dca0e067e08ecb4cf264b9344d45 (diff) |
- General interface, update RPC and Has/Set methods
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r-- | plugins/kv/rpc.go | 198 |
1 files changed, 121 insertions, 77 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 751f0d12..69b91981 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -1,110 +1,154 @@ package kv import ( + "unsafe" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" ) // Wrapper for the plugin -type RPCServer struct { +type rpc struct { + // all available storages + storages map[string]Storage // svc is a plugin implementing Storage interface - svc Storage + srv *Plugin // Logger log logger.Logger } -// NewRPCServer construct RPC server for the particular plugin -func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { - return &RPCServer{ - svc: srv, - log: log, - } -} - -// data Data -func (r *RPCServer) Has(in []string, res *map[string]bool) error { - const op = errors.Op("rpc server Has") - ret, err := r.svc.Has(in...) - if err != nil { - return errors.E(op, err) - } - - // update the value in the pointer - *res = ret - return nil -} +// Has accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Has(in []byte, res *map[string]bool) error { + const op = errors.Op("rpc_has") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() -// in SetData -func (r *RPCServer) Set(in []Item, ok *bool) error { - const op = errors.Op("rpc server Set") + keys := make([]string, 0, l) - err := r.svc.Set(in...) - if err != nil { - return errors.E(op, err) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, strConvert(tmpItem.Key())) } - *ok = true - return nil -} + if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok { + ret, err := st.Has(keys...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server MGet") - ret, err := r.svc.MGet(in...) - if err != nil { - return errors.E(op, err) + // update the value in the pointer + // save the result + *res = ret + return nil } - // update return value - *res = ret - return nil + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) MExpire(in []Item, ok *bool) error { - const op = errors.Op("rpc server MExpire") +//Set accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Set(in []byte, ok *bool) error { + const op = errors.Op("rpc_set") - err := r.svc.MExpire(in...) - if err != nil { - return errors.E(op, err) - } + dataRoot := generated.GetRootAsPayload(in, 0) - *ok = true - return nil -} + l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) + tmpItem := &generated.Item{} -// in Data -func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server TTL") + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } - ret, err := r.svc.TTL(in...) - if err != nil { - return errors.E(op, err) + itc := Item{ + Key: string(tmpItem.Key()), + Value: string(tmpItem.Value()), + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) } - *res = ret - return nil -} + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Set(items...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) Delete(in []string, ok *bool) error { - const op = errors.Op("rpc server Delete") - err := r.svc.Delete(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - *ok = true - return nil -} -// in string, storages -func (r *RPCServer) Close(storage string, ok *bool) error { - const op = errors.Op("rpc server Close") - err := r.svc.Close() - if err != nil { - return errors.E(op, err) - } - *ok = true + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} - return nil +// MGet accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { +// const op = errors.Op("rpc server MGet") +// ret, err := r.svc.MGet(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// // update return value +// *res = ret +// return nil +//} + +//// MExpire accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) MExpire(in []Item, ok *bool) error { +// const op = errors.Op("rpc server MExpire") +// +// err := r.svc.MExpire(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// *ok = true +// return nil +//} +// +//// TTL accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { +// const op = errors.Op("rpc server TTL") +// +// ret, err := r.svc.TTL(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// *res = ret +// return nil +//} +// +//// Delete accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) Delete(in []string, ok *bool) error { +// const op = errors.Op("rpc server Delete") +// err := r.svc.Delete(in...) +// if err != nil { +// return errors.E(op, err) +// } +// *ok = true +// return nil +//} +// +//// Close closes the storage connection +//func (r *rpc) Close(storage string, ok *bool) error { +// const op = errors.Op("rpc server Close") +// err := r.svc.Close() +// if err != nil { +// return errors.E(op, err) +// } +// *ok = true +// +// return nil +//} + +func strConvert(s []byte) string { + return *(*string)(unsafe.Pointer(&s)) } |