From e4d65a41ec90747a387cfe769f743327959f7105 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Apr 2021 00:48:35 +0300 Subject: - General interface, update RPC and Has/Set methods --- plugins/kv/rpc.go | 198 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 121 insertions(+), 77 deletions(-) (limited to 'plugins/kv/rpc.go') diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 751f0d12..69b91981 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -1,110 +1,154 @@ package kv import ( + "unsafe" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" "github.com/spiral/roadrunner/v2/plugins/logger" ) // Wrapper for the plugin -type RPCServer struct { +type rpc struct { + // all available storages + storages map[string]Storage // svc is a plugin implementing Storage interface - svc Storage + srv *Plugin // Logger log logger.Logger } -// NewRPCServer construct RPC server for the particular plugin -func NewRPCServer(srv Storage, log logger.Logger) *RPCServer { - return &RPCServer{ - svc: srv, - log: log, - } -} - -// data Data -func (r *RPCServer) Has(in []string, res *map[string]bool) error { - const op = errors.Op("rpc server Has") - ret, err := r.svc.Has(in...) - if err != nil { - return errors.E(op, err) - } - - // update the value in the pointer - *res = ret - return nil -} +// Has accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Has(in []byte, res *map[string]bool) error { + const op = errors.Op("rpc_has") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() -// in SetData -func (r *RPCServer) Set(in []Item, ok *bool) error { - const op = errors.Op("rpc server Set") + keys := make([]string, 0, l) - err := r.svc.Set(in...) - if err != nil { - return errors.E(op, err) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, strConvert(tmpItem.Key())) } - *ok = true - return nil -} + if st, ok := r.storages[strConvert(dataRoot.Storage())]; ok { + ret, err := st.Has(keys...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) MGet(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server MGet") - ret, err := r.svc.MGet(in...) - if err != nil { - return errors.E(op, err) + // update the value in the pointer + // save the result + *res = ret + return nil } - // update return value - *res = ret - return nil + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// in Data -func (r *RPCServer) MExpire(in []Item, ok *bool) error { - const op = errors.Op("rpc server MExpire") +//Set accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Set(in []byte, ok *bool) error { + const op = errors.Op("rpc_set") - err := r.svc.MExpire(in...) - if err != nil { - return errors.E(op, err) - } + dataRoot := generated.GetRootAsPayload(in, 0) - *ok = true - return nil -} + l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) + tmpItem := &generated.Item{} -// in Data -func (r *RPCServer) TTL(in []string, res *map[string]interface{}) error { - const op = errors.Op("rpc server TTL") + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } - ret, err := r.svc.TTL(in...) - if err != nil { - return errors.E(op, err) + itc := Item{ + Key: string(tmpItem.Key()), + Value: string(tmpItem.Value()), + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) } - *res = ret - return nil -} + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Set(items...) + if err != nil { + return err + } -// in Data -func (r *RPCServer) Delete(in []string, ok *bool) error { - const op = errors.Op("rpc server Delete") - err := r.svc.Delete(in...) - if err != nil { - return errors.E(op, err) + // save the result + *ok = true + return nil } - *ok = true - return nil -} -// in string, storages -func (r *RPCServer) Close(storage string, ok *bool) error { - const op = errors.Op("rpc server Close") - err := r.svc.Close() - if err != nil { - return errors.E(op, err) - } - *ok = true + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} - return nil +// MGet accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { +// const op = errors.Op("rpc server MGet") +// ret, err := r.svc.MGet(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// // update return value +// *res = ret +// return nil +//} + +//// MExpire accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) MExpire(in []Item, ok *bool) error { +// const op = errors.Op("rpc server MExpire") +// +// err := r.svc.MExpire(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// *ok = true +// return nil +//} +// +//// TTL accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { +// const op = errors.Op("rpc server TTL") +// +// ret, err := r.svc.TTL(in...) +// if err != nil { +// return errors.E(op, err) +// } +// +// *res = ret +// return nil +//} +// +//// Delete accept []byte flatbuffers payload with Storage and Item +//func (r *rpc) Delete(in []string, ok *bool) error { +// const op = errors.Op("rpc server Delete") +// err := r.svc.Delete(in...) +// if err != nil { +// return errors.E(op, err) +// } +// *ok = true +// return nil +//} +// +//// Close closes the storage connection +//func (r *rpc) Close(storage string, ok *bool) error { +// const op = errors.Op("rpc server Close") +// err := r.svc.Close() +// if err != nil { +// return errors.E(op, err) +// } +// *ok = true +// +// return nil +//} + +func strConvert(s []byte) string { + return *(*string)(unsafe.Pointer(&s)) } -- cgit v1.2.3 From abf606afd6fd9fbb0fd374ab5da41a8ee8d5a15d Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Apr 2021 14:41:50 +0300 Subject: - Implement tests for the KV - Implement Storage interface for the boltdb,memory,memcached drivers Signed-off-by: Valery Piashchynski --- plugins/kv/rpc.go | 209 ++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 147 insertions(+), 62 deletions(-) (limited to 'plugins/kv/rpc.go') diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 69b91981..cc3875e0 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -49,13 +49,13 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -//Set accept []byte flatbuffers payload with Storage and Item +// Set accept []byte flatbuffers payload with Storage and Item func (r *rpc) Set(in []byte, ok *bool) error { const op = errors.Op("rpc_set") dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) tmpItem := &generated.Item{} @@ -84,70 +84,155 @@ func (r *rpc) Set(in []byte, ok *bool) error { return nil } + *ok = false return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } // MGet accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server MGet") -// ret, err := r.svc.MGet(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// // update return value -// *res = ret -// return nil -//} - -//// MExpire accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MExpire(in []Item, ok *bool) error { -// const op = errors.Op("rpc server MExpire") -// -// err := r.svc.MExpire(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *ok = true -// return nil -//} -// -//// TTL accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server TTL") -// -// ret, err := r.svc.TTL(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *res = ret -// return nil -//} -// -//// Delete accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) Delete(in []string, ok *bool) error { -// const op = errors.Op("rpc server Delete") -// err := r.svc.Delete(in...) -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// return nil -//} -// -//// Close closes the storage connection -//func (r *rpc) Close(storage string, ok *bool) error { -// const op = errors.Op("rpc server Close") -// err := r.svc.Close() -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// -// return nil -//} +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Close closes the storage connection +func (r *rpc) Close(storage string, ok *bool) error { + const op = errors.Op("rpc_close") + if st, exists := r.storages[storage]; exists { + err := st.Close() + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return nil +} func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) -- cgit v1.2.3 From 91c1fa2e2693cb662425c1ba7cca2325a8458995 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 22 Apr 2021 22:03:59 +0300 Subject: - Rework storage drivers Signed-off-by: Valery Piashchynski --- plugins/kv/rpc.go | 18 ------------------ 1 file changed, 18 deletions(-) (limited to 'plugins/kv/rpc.go') diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index cc3875e0..4947dbe3 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -216,24 +216,6 @@ func (r *rpc) Delete(in []byte, ok *bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -// Close closes the storage connection -func (r *rpc) Close(storage string, ok *bool) error { - const op = errors.Op("rpc_close") - if st, exists := r.storages[storage]; exists { - err := st.Close() - if err != nil { - return errors.E(op, err) - } - - // save the result - *ok = true - return nil - } - - *ok = false - return nil -} - func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) } -- cgit v1.2.3