summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-09 20:26:18 +0300
committerGitHub <[email protected]>2021-06-09 20:26:18 +0300
commitb99bfbe21a0f44b1a16b9110d779719fc637127c (patch)
tree3aabdb96c86a59325d816ad64cabc967ef2c8f10 /plugins
parent8fdf05d4f360a9f6344141b273eab9d6859470e0 (diff)
parent7665167623147403d575b7e2cf125073cbe6584d (diff)
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC callsv2.3.0-beta.3
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC calls
Diffstat (limited to 'plugins')
-rw-r--r--plugins/kv/drivers/boltdb/driver.go16
-rw-r--r--plugins/kv/drivers/memcached/driver.go9
-rw-r--r--plugins/kv/drivers/memory/driver.go11
-rw-r--r--plugins/kv/drivers/redis/driver.go13
-rw-r--r--plugins/kv/interface.go4
-rw-r--r--plugins/kv/rpc.go46
-rw-r--r--plugins/websockets/rpc.go14
7 files changed, 60 insertions, 53 deletions
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index ba873513..253b9d33 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -162,7 +162,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
return val, nil
}
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("boltdb_driver_mget")
// defense
if keys == nil {
@@ -177,7 +177,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
err := d.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket(d.bucket)
@@ -186,7 +186,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
buf := new(bytes.Buffer)
- var out string
+ var out []byte
buf.Grow(100)
for i := range keys {
value := b.Get([]byte(keys[i]))
@@ -200,7 +200,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
m[keys[i]] = out
buf.Reset()
- out = ""
+ out = nil
}
}
@@ -241,8 +241,8 @@ func (d *Driver) Set(items ...*kvv1.Item) error {
// performance note: pass a prepared bytes slice with initial cap
// we can't move buf and gob out of loop, because we need to clear both from data
// but gob will contain (w/o re-init) the past data
- buf := bytes.Buffer{}
- encoder := gob.NewEncoder(&buf)
+ buf := new(bytes.Buffer)
+ encoder := gob.NewEncoder(buf)
if errors.Is(errors.EmptyItem, err) {
return errors.E(op, errors.EmptyItem)
}
@@ -342,7 +342,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("boltdb_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -356,7 +356,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for i := range keys {
if item, ok := d.gc.Load(keys[i]); ok {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 8ea515d0..c1f79cbb 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -97,7 +96,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
// MGet return map with key -- string
// and map value as value -- []byte
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("memcached_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -111,7 +110,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for i := range keys {
// Here also MultiGet
data, err := d.client.Get(keys[i])
@@ -150,7 +149,7 @@ func (d *Driver) Set(items ...*kvv1.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: utils.AsBytes(items[i].Value),
+ Value: items[i].Value,
Flags: 0,
}
@@ -206,7 +205,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
}
// TTL return time in seconds (int32) for a given keys
-func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(_ ...string) (map[string]string, error) {
const op = errors.Op("memcached_plugin_ttl")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 3158adee..9b7d7259 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -72,12 +71,12 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(*kvv1.Item).Value), nil
+ return data.(*kvv1.Item).Value, nil
}
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -91,7 +90,7 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
@@ -160,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -174,7 +173,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index 0aaa6352..66cb8384 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -101,7 +102,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
// MGet loads content of multiple values (some values might be skipped).
// https://redis.io/commands/mget
// Returns slice with the interfaces with values
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("redis_driver_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -115,7 +116,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for _, k := range keys {
cmd := d.universalClient.Get(context.Background(), k)
@@ -126,7 +127,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
return nil, errors.E(op, cmd.Err())
}
- m[k] = cmd.Val()
+ m[k] = utils.AsBytes(cmd.Val())
}
return m, nil
@@ -213,7 +214,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
// TTL https://redis.io/commands/ttl
// return time in seconds (float64) for a given keys
-func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("redis_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -227,7 +228,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for _, key := range keys {
duration, err := d.universalClient.TTL(context.Background(), key).Result()
@@ -235,7 +236,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
return nil, err
}
- m[key] = duration.Seconds()
+ m[key] = duration.String()
}
return m, nil
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 7841f9a2..744c6b51 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -12,7 +12,7 @@ type Storage interface {
// MGet loads content of multiple values
// Returns the map with existing keys and associated values
- MGet(keys ...string) (map[string]interface{}, error)
+ MGet(keys ...string) (map[string][]byte, error)
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
@@ -23,7 +23,7 @@ type Storage interface {
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
- TTL(keys ...string) (map[string]interface{}, error)
+ TTL(keys ...string) (map[string]string, error)
// Delete one or multiple keys.
Delete(keys ...string) error
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 557d3ee1..ab1f7f31 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -17,7 +17,7 @@ type rpc struct {
}
// Has accept []*kvv1.Payload proto payload with Storage and Item
-func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
+func (r *rpc) Has(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_has")
if in.GetStorage() == "" {
@@ -38,7 +38,12 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
// update the value in the pointer
// save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ })
+ }
return nil
}
@@ -46,7 +51,7 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
}
// Set accept proto payload with Storage and Item
-func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) Set(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rpc_set")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -56,16 +61,14 @@ func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
}
// save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
// MGet accept proto payload with Storage and Item
-func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
+func (r *rpc) MGet(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_mget")
keys := make([]string, 0, len(in.GetItems()))
@@ -80,8 +83,13 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
return errors.E(op, err)
}
- // save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ Value: ret[k],
+ })
+ }
return nil
}
@@ -89,7 +97,7 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
}
// MExpire accept proto payload with Storage and Item
-func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) MExpire(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rpc_mexpire")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -98,17 +106,14 @@ func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
return errors.E(op, err)
}
- // save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
// TTL accept proto payload with Storage and Item
-func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
+func (r *rpc) TTL(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_ttl")
keys := make([]string, 0, len(in.GetItems()))
@@ -122,8 +127,14 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
return errors.E(op, err)
}
- // save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ Timeout: ret[k],
+ })
+ }
+
return nil
}
@@ -131,7 +142,7 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
}
// Delete accept proto payload with Storage and Item
-func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rcp_delete")
keys := make([]string, 0, len(in.GetItems()))
@@ -145,11 +156,8 @@ func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
return errors.E(op, err)
}
- // save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 00c1dd91..80697fa2 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -15,12 +15,12 @@ type rpc struct {
// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("broadcast_publish")
// just return in case of nil message
if in == nil {
- *ok = true
+ out.Ok = false
return nil
}
@@ -36,23 +36,23 @@ func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
err = r.plugin.Publish(bb)
if err != nil {
- *ok = false
+ out.Ok = false
return errors.E(op, err)
}
}
- *ok = true
+ out.Ok = false
return nil
}
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("publish_async")
// just return in case of nil message
if in == nil {
- *ok = true
+ out.Ok = false
return nil
}
@@ -69,6 +69,6 @@ func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
r.plugin.PublishAsync(bb)
}
- *ok = true
+ out.Ok = false
return nil
}