diff options
author | Valery Piashchynski <[email protected]> | 2021-04-22 14:41:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-04-22 14:41:50 +0300 |
commit | abf606afd6fd9fbb0fd374ab5da41a8ee8d5a15d (patch) | |
tree | b0cfdeb5c478463d853b8a7409e88cfa7219c3fa /plugins/kv/rpc.go | |
parent | e4d65a41ec90747a387cfe769f743327959f7105 (diff) |
- Implement tests for the KV
- Implement Storage interface for the boltdb,memory,memcached drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/kv/rpc.go')
-rw-r--r-- | plugins/kv/rpc.go | 209 |
1 files changed, 147 insertions, 62 deletions
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 69b91981..cc3875e0 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -49,13 +49,13 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } -//Set accept []byte flatbuffers payload with Storage and Item +// Set accept []byte flatbuffers payload with Storage and Item func (r *rpc) Set(in []byte, ok *bool) error { const op = errors.Op("rpc_set") dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() + items := make([]Item, 0, dataRoot.ItemsLength()) tmpItem := &generated.Item{} @@ -84,70 +84,155 @@ func (r *rpc) Set(in []byte, ok *bool) error { return nil } + *ok = false return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) } // MGet accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MGet(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server MGet") -// ret, err := r.svc.MGet(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// // update return value -// *res = ret -// return nil -//} - -//// MExpire accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) MExpire(in []Item, ok *bool) error { -// const op = errors.Op("rpc server MExpire") -// -// err := r.svc.MExpire(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *ok = true -// return nil -//} -// -//// TTL accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) TTL(in []string, res *map[string]interface{}) error { -// const op = errors.Op("rpc server TTL") -// -// ret, err := r.svc.TTL(in...) -// if err != nil { -// return errors.E(op, err) -// } -// -// *res = ret -// return nil -//} -// -//// Delete accept []byte flatbuffers payload with Storage and Item -//func (r *rpc) Delete(in []string, ok *bool) error { -// const op = errors.Op("rpc server Delete") -// err := r.svc.Delete(in...) -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// return nil -//} -// -//// Close closes the storage connection -//func (r *rpc) Close(storage string, ok *bool) error { -// const op = errors.Op("rpc server Close") -// err := r.svc.Close() -// if err != nil { -// return errors.E(op, err) -// } -// *ok = true -// -// return nil -//} +func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_mget") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.MGet(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// MExpire accept []byte flatbuffers payload with Storage and Item +func (r *rpc) MExpire(in []byte, ok *bool) error { + const op = errors.Op("rpc_mexpire") + + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + + // when unmarshalling the keys, simultaneously, fill up the slice with items + items := make([]Item, 0, l) + tmpItem := &generated.Item{} + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + + itc := Item{ + Key: string(tmpItem.Key()), + // we set up timeout on the keys, so, value here is redundant + Value: "", + TTL: string(tmpItem.Timeout()), + } + + items = append(items, itc) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.MExpire(items...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// TTL accept []byte flatbuffers payload with Storage and Item +func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { + const op = errors.Op("rpc_ttl") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + ret, err := st.TTL(keys...) + if err != nil { + return err + } + + // save the result + *res = ret + return nil + } + + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Delete accept []byte flatbuffers payload with Storage and Item +func (r *rpc) Delete(in []byte, ok *bool) error { + const op = errors.Op("rcp_delete") + dataRoot := generated.GetRootAsPayload(in, 0) + l := dataRoot.ItemsLength() + keys := make([]string, 0, l) + tmpItem := &generated.Item{} + + for i := 0; i < l; i++ { + if !dataRoot.Items(tmpItem, i) { + continue + } + keys = append(keys, string(tmpItem.Key())) + } + if st, exists := r.storages[strConvert(dataRoot.Storage())]; exists { + err := st.Delete(keys...) + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) +} + +// Close closes the storage connection +func (r *rpc) Close(storage string, ok *bool) error { + const op = errors.Op("rpc_close") + if st, exists := r.storages[storage]; exists { + err := st.Close() + if err != nil { + return errors.E(op, err) + } + + // save the result + *ok = true + return nil + } + + *ok = false + return nil +} func strConvert(s []byte) string { return *(*string)(unsafe.Pointer(&s)) |