diff options
Diffstat (limited to 'plugins/kv')
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 16 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 9 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 11 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 13 | ||||
-rw-r--r-- | plugins/kv/interface.go | 4 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 46 |
6 files changed, 53 insertions, 46 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index ba873513..253b9d33 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -162,7 +162,7 @@ func (d *Driver) Get(key string) ([]byte, error) { return val, nil } -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("boltdb_driver_mget") // defense if keys == nil { @@ -177,7 +177,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) err := d.DB.View(func(tx *bolt.Tx) error { b := tx.Bucket(d.bucket) @@ -186,7 +186,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } buf := new(bytes.Buffer) - var out string + var out []byte buf.Grow(100) for i := range keys { value := b.Get([]byte(keys[i])) @@ -200,7 +200,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } m[keys[i]] = out buf.Reset() - out = "" + out = nil } } @@ -241,8 +241,8 @@ func (d *Driver) Set(items ...*kvv1.Item) error { // performance note: pass a prepared bytes slice with initial cap // we can't move buf and gob out of loop, because we need to clear both from data // but gob will contain (w/o re-init) the past data - buf := bytes.Buffer{} - encoder := gob.NewEncoder(&buf) + buf := new(bytes.Buffer) + encoder := gob.NewEncoder(buf) if errors.Is(errors.EmptyItem, err) { return errors.E(op, errors.EmptyItem) } @@ -342,7 +342,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("boltdb_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -356,7 +356,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for i := range keys { if item, ok := d.gc.Load(keys[i]); ok { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 8ea515d0..c1f79cbb 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -97,7 +96,7 @@ func (d *Driver) Get(key string) ([]byte, error) { // MGet return map with key -- string // and map value as value -- []byte -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("memcached_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -111,7 +110,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for i := range keys { // Here also MultiGet data, err := d.client.Get(keys[i]) @@ -150,7 +149,7 @@ func (d *Driver) Set(items ...*kvv1.Item) error { memcachedItem := &memcache.Item{ Key: items[i].Key, // unsafe convert - Value: utils.AsBytes(items[i].Value), + Value: items[i].Value, Flags: 0, } @@ -206,7 +205,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { } // TTL return time in seconds (int32) for a given keys -func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(_ ...string) (map[string]string, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index 3158adee..9b7d7259 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -72,12 +71,12 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(*kvv1.Item).Value), nil + return data.(*kvv1.Item).Value, nil } return nil, nil } -func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -91,7 +90,7 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { @@ -160,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (s *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -174,7 +173,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index 0aaa6352..66cb8384 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -11,6 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -101,7 +102,7 @@ func (d *Driver) Get(key string) ([]byte, error) { // MGet loads content of multiple values (some values might be skipped). // https://redis.io/commands/mget // Returns slice with the interfaces with values -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("redis_driver_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -115,7 +116,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for _, k := range keys { cmd := d.universalClient.Get(context.Background(), k) @@ -126,7 +127,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { return nil, errors.E(op, cmd.Err()) } - m[k] = cmd.Val() + m[k] = utils.AsBytes(cmd.Val()) } return m, nil @@ -213,7 +214,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { // TTL https://redis.io/commands/ttl // return time in seconds (float64) for a given keys -func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("redis_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -227,7 +228,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for _, key := range keys { duration, err := d.universalClient.TTL(context.Background(), key).Result() @@ -235,7 +236,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { return nil, err } - m[key] = duration.Seconds() + m[key] = duration.String() } return m, nil } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 7841f9a2..744c6b51 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -12,7 +12,7 @@ type Storage interface { // MGet loads content of multiple values // Returns the map with existing keys and associated values - MGet(keys ...string) (map[string]interface{}, error) + MGet(keys ...string) (map[string][]byte, error) // Set used to upload item to KV with TTL // 0 value in TTL means no TTL @@ -23,7 +23,7 @@ type Storage interface { // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb - TTL(keys ...string) (map[string]interface{}, error) + TTL(keys ...string) (map[string]string, error) // Delete one or multiple keys. Delete(keys ...string) error diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 557d3ee1..ab1f7f31 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -17,7 +17,7 @@ type rpc struct { } // Has accept []*kvv1.Payload proto payload with Storage and Item -func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { +func (r *rpc) Has(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_has") if in.GetStorage() == "" { @@ -38,7 +38,12 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { // update the value in the pointer // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + }) + } return nil } @@ -46,7 +51,7 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { } // Set accept proto payload with Storage and Item -func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { +func (r *rpc) Set(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rpc_set") if st, exists := r.storages[in.GetStorage()]; exists { @@ -56,16 +61,14 @@ func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { } // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } // MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { +func (r *rpc) MGet(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_mget") keys := make([]string, 0, len(in.GetItems())) @@ -80,8 +83,13 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { return errors.E(op, err) } - // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + Value: ret[k], + }) + } return nil } @@ -89,7 +97,7 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { } // MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { +func (r *rpc) MExpire(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rpc_mexpire") if st, exists := r.storages[in.GetStorage()]; exists { @@ -98,17 +106,14 @@ func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { return errors.E(op, err) } - // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } // TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { +func (r *rpc) TTL(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_ttl") keys := make([]string, 0, len(in.GetItems())) @@ -122,8 +127,14 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { return errors.E(op, err) } - // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + Timeout: ret[k], + }) + } + return nil } @@ -131,7 +142,7 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { } // Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { +func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rcp_delete") keys := make([]string, 0, len(in.GetItems())) @@ -145,11 +156,8 @@ func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { return errors.E(op, err) } - // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } |