summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-09 20:26:18 +0300
committerGitHub <[email protected]>2021-06-09 20:26:18 +0300
commitb99bfbe21a0f44b1a16b9110d779719fc637127c (patch)
tree3aabdb96c86a59325d816ad64cabc967ef2c8f10
parent8fdf05d4f360a9f6344141b273eab9d6859470e0 (diff)
parent7665167623147403d575b7e2cf125073cbe6584d (diff)
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC callsv2.3.0-beta.3
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC calls
-rw-r--r--CHANGELOG.md15
-rw-r--r--go.mod4
-rw-r--r--go.sum14
-rw-r--r--pkg/proto/kv/v1beta/kv.pb.go123
-rw-r--r--pkg/proto/kv/v1beta/kv.proto9
-rw-r--r--pkg/proto/websockets/v1beta/websockets.pb.go104
-rw-r--r--pkg/proto/websockets/v1beta/websockets.proto8
-rw-r--r--plugins/kv/drivers/boltdb/driver.go16
-rw-r--r--plugins/kv/drivers/memcached/driver.go9
-rw-r--r--plugins/kv/drivers/memory/driver.go11
-rw-r--r--plugins/kv/drivers/redis/driver.go13
-rw-r--r--plugins/kv/interface.go4
-rw-r--r--plugins/kv/rpc.go46
-rw-r--r--plugins/websockets/rpc.go14
-rw-r--r--tests/plugins/kv/storage_plugin_test.go325
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go34
16 files changed, 429 insertions, 320 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fa193010..38290d70 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,7 +9,7 @@ v2.3.0 (08.06.2021)
- ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513)
-- ✏️ Protobuf binary messages for the `websockets` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
+- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711)
- ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
- ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659)
@@ -20,12 +20,13 @@ v2.3.0 (08.06.2021)
## 🩹 Fixes:
- 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686)
-- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in
- logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
-- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob`
- error: [Bug](https://github.com/spiral/roadrunner/issues/691)
-- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the
- NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
+- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659)
+- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691)
+- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701)
+
+## 📦 Packages:
+
+- 📦 Update goridge to `v3.1.0`
---
diff --git a/go.mod b/go.mod
index 4142d9b3..ed6eed96 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ require (
// SPIRAL ====
github.com/spiral/endure v1.0.1
github.com/spiral/errors v1.0.11
- github.com/spiral/goridge/v3 v3.0.1
+ github.com/spiral/goridge/v3 v3.1.0
// ===========
github.com/stretchr/testify v1.7.0
github.com/tklauser/go-sysconf v0.3.4 // indirect
@@ -35,6 +35,6 @@ require (
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
- google.golang.org/protobuf v1.23.0
+ google.golang.org/protobuf v1.26.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
diff --git a/go.sum b/go.sum
index 3edc27a7..69e4c84d 100644
--- a/go.sum
+++ b/go.sum
@@ -142,8 +142,9 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
-github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
+github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@@ -385,12 +386,12 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/spiral/endure v1.0.1 h1:JHXHHPDiet5Cfx8i2KiC+ayqACmK5Sw0fxNE/QpIuWM=
github.com/spiral/endure v1.0.1/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE=
-github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
+github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go=
github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
-github.com/spiral/goridge/v3 v3.0.1 h1:mWo6hVEDJV3nRwsszx9y262CtrLQNojbONF4ikvKCBg=
-github.com/spiral/goridge/v3 v3.0.1/go.mod h1:rYfsBwigGneLgYJTIh5urotnH63I5O+p6ZcVq7xc1lY=
+github.com/spiral/goridge/v3 v3.1.0 h1:zbNCQaLj+QyM1e/uQP2RHbiZSlxJJvuLAE7GGA3EeYo=
+github.com/spiral/goridge/v3 v3.1.0/go.mod h1:yL47qBzJ1ic89PpfROyTnLjR9XplaEmvXhnJSrooeGw=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
@@ -399,7 +400,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
@@ -657,8 +657,10 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
-google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
+google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
+google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/pkg/proto/kv/v1beta/kv.pb.go b/pkg/proto/kv/v1beta/kv.pb.go
index 76450869..622967b8 100644
--- a/pkg/proto/kv/v1beta/kv.pb.go
+++ b/pkg/proto/kv/v1beta/kv.pb.go
@@ -20,7 +20,7 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-type Payload struct {
+type Request struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -30,8 +30,8 @@ type Payload struct {
Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
}
-func (x *Payload) Reset() {
- *x = Payload{}
+func (x *Request) Reset() {
+ *x = Request{}
if protoimpl.UnsafeEnabled {
mi := &file_kv_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -39,13 +39,13 @@ func (x *Payload) Reset() {
}
}
-func (x *Payload) String() string {
+func (x *Request) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*Payload) ProtoMessage() {}
+func (*Request) ProtoMessage() {}
-func (x *Payload) ProtoReflect() protoreflect.Message {
+func (x *Request) ProtoReflect() protoreflect.Message {
mi := &file_kv_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -57,19 +57,19 @@ func (x *Payload) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
-func (*Payload) Descriptor() ([]byte, []int) {
+// Deprecated: Use Request.ProtoReflect.Descriptor instead.
+func (*Request) Descriptor() ([]byte, []int) {
return file_kv_proto_rawDescGZIP(), []int{0}
}
-func (x *Payload) GetStorage() string {
+func (x *Request) GetStorage() string {
if x != nil {
return x.Storage
}
return ""
}
-func (x *Payload) GetItems() []*Item {
+func (x *Request) GetItems() []*Item {
if x != nil {
return x.Items
}
@@ -82,7 +82,7 @@ type Item struct {
unknownFields protoimpl.UnknownFields
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
- Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
// RFC 3339
Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
}
@@ -126,11 +126,11 @@ func (x *Item) GetKey() string {
return ""
}
-func (x *Item) GetValue() string {
+func (x *Item) GetValue() []byte {
if x != nil {
return x.Value
}
- return ""
+ return nil
}
func (x *Item) GetTimeout() string {
@@ -140,22 +140,73 @@ func (x *Item) GetTimeout() string {
return ""
}
+// KV response for the KV RPC methods
+type Response struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Items []*Item `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
+}
+
+func (x *Response) Reset() {
+ *x = Response{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_kv_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Response) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Response) ProtoMessage() {}
+
+func (x *Response) ProtoReflect() protoreflect.Message {
+ mi := &file_kv_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Response.ProtoReflect.Descriptor instead.
+func (*Response) Descriptor() ([]byte, []int) {
+ return file_kv_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *Response) GetItems() []*Item {
+ if x != nil {
+ return x.Items
+ }
+ return nil
+}
+
var File_kv_proto protoreflect.FileDescriptor
var file_kv_proto_rawDesc = []byte{
0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76,
- 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74,
0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76,
0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d,
0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76,
- 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01,
- 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e,
- 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
- 0x6f, 0x33,
+ 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x31, 0x0a, 0x08, 0x52,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73,
+ 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76, 0x31, 0x62, 0x65,
+ 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x42, 0x0d,
+ 0x5a, 0x0b, 0x2e, 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -170,18 +221,20 @@ func file_kv_proto_rawDescGZIP() []byte {
return file_kv_proto_rawDescData
}
-var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_kv_proto_goTypes = []interface{}{
- (*Payload)(nil), // 0: kv.v1beta.Payload
- (*Item)(nil), // 1: kv.v1beta.Item
+ (*Request)(nil), // 0: kv.v1beta.Request
+ (*Item)(nil), // 1: kv.v1beta.Item
+ (*Response)(nil), // 2: kv.v1beta.Response
}
var file_kv_proto_depIdxs = []int32{
- 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item
- 1, // [1:1] is the sub-list for method output_type
- 1, // [1:1] is the sub-list for method input_type
- 1, // [1:1] is the sub-list for extension type_name
- 1, // [1:1] is the sub-list for extension extendee
- 0, // [0:1] is the sub-list for field type_name
+ 1, // 0: kv.v1beta.Request.items:type_name -> kv.v1beta.Item
+ 1, // 1: kv.v1beta.Response.items:type_name -> kv.v1beta.Item
+ 2, // [2:2] is the sub-list for method output_type
+ 2, // [2:2] is the sub-list for method input_type
+ 2, // [2:2] is the sub-list for extension type_name
+ 2, // [2:2] is the sub-list for extension extendee
+ 0, // [0:2] is the sub-list for field type_name
}
func init() { file_kv_proto_init() }
@@ -191,7 +244,7 @@ func file_kv_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Payload); i {
+ switch v := v.(*Request); i {
case 0:
return &v.state
case 1:
@@ -214,6 +267,18 @@ func file_kv_proto_init() {
return nil
}
}
+ file_kv_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Response); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -221,7 +286,7 @@ func file_kv_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_kv_proto_rawDesc,
NumEnums: 0,
- NumMessages: 2,
+ NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/pkg/proto/kv/v1beta/kv.proto b/pkg/proto/kv/v1beta/kv.proto
index 1ec0e6b7..1e3b8177 100644
--- a/pkg/proto/kv/v1beta/kv.proto
+++ b/pkg/proto/kv/v1beta/kv.proto
@@ -3,7 +3,7 @@ syntax = "proto3";
package kv.v1beta;
option go_package = "./;kvv1beta";
-message Payload {
+message Request {
// could be an enum in the future
string storage = 1;
repeated Item items = 2;
@@ -11,7 +11,12 @@ message Payload {
message Item {
string key = 1;
- string value = 2;
+ bytes value = 2;
// RFC 3339
string timeout = 3;
}
+
+// KV response for the KV RPC methods
+message Response {
+ repeated Item items = 1;
+}
diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go
index f04c3cb2..d39b55da 100644
--- a/pkg/proto/websockets/v1beta/websockets.pb.go
+++ b/pkg/proto/websockets/v1beta/websockets.pb.go
@@ -91,7 +91,7 @@ func (x *Message) GetPayload() []byte {
return nil
}
-type Messages struct {
+type Request struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@@ -99,8 +99,8 @@ type Messages struct {
Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
}
-func (x *Messages) Reset() {
- *x = Messages{}
+func (x *Request) Reset() {
+ *x = Request{}
if protoimpl.UnsafeEnabled {
mi := &file_websockets_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -108,13 +108,13 @@ func (x *Messages) Reset() {
}
}
-func (x *Messages) String() string {
+func (x *Request) String() string {
return protoimpl.X.MessageStringOf(x)
}
-func (*Messages) ProtoMessage() {}
+func (*Request) ProtoMessage() {}
-func (x *Messages) ProtoReflect() protoreflect.Message {
+func (x *Request) ProtoReflect() protoreflect.Message {
mi := &file_websockets_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@@ -126,18 +126,65 @@ func (x *Messages) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
-// Deprecated: Use Messages.ProtoReflect.Descriptor instead.
-func (*Messages) Descriptor() ([]byte, []int) {
+// Deprecated: Use Request.ProtoReflect.Descriptor instead.
+func (*Request) Descriptor() ([]byte, []int) {
return file_websockets_proto_rawDescGZIP(), []int{1}
}
-func (x *Messages) GetMessages() []*Message {
+func (x *Request) GetMessages() []*Message {
if x != nil {
return x.Messages
}
return nil
}
+type Response struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"`
+}
+
+func (x *Response) Reset() {
+ *x = Response{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_websockets_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Response) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Response) ProtoMessage() {}
+
+func (x *Response) ProtoReflect() protoreflect.Message {
+ mi := &file_websockets_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Response.ProtoReflect.Descriptor instead.
+func (*Response) Descriptor() ([]byte, []int) {
+ return file_websockets_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *Response) GetOk() bool {
+ if x != nil {
+ return x.Ok
+ }
+ return false
+}
+
var File_websockets_proto protoreflect.FileDescriptor
var file_websockets_proto_rawDesc = []byte{
@@ -150,13 +197,15 @@ var file_websockets_proto_rawDesc = []byte{
0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03,
0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79,
- 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
- 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03,
- 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e,
- 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08,
- 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77,
- 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
+ 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
+ 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d,
+ 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f,
+ 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52,
+ 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63,
+ 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x33,
}
var (
@@ -171,13 +220,14 @@ func file_websockets_proto_rawDescGZIP() []byte {
return file_websockets_proto_rawDescData
}
-var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_websockets_proto_goTypes = []interface{}{
(*Message)(nil), // 0: websockets.v1beta.Message
- (*Messages)(nil), // 1: websockets.v1beta.Messages
+ (*Request)(nil), // 1: websockets.v1beta.Request
+ (*Response)(nil), // 2: websockets.v1beta.Response
}
var file_websockets_proto_depIdxs = []int32{
- 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message
+ 0, // 0: websockets.v1beta.Request.messages:type_name -> websockets.v1beta.Message
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
@@ -204,7 +254,19 @@ func file_websockets_proto_init() {
}
}
file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Messages); i {
+ switch v := v.(*Request); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_websockets_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Response); i {
case 0:
return &v.state
case 1:
@@ -222,7 +284,7 @@ func file_websockets_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_websockets_proto_rawDesc,
NumEnums: 0,
- NumMessages: 2,
+ NumMessages: 3,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto
index a61da93d..ede3cde9 100644
--- a/pkg/proto/websockets/v1beta/websockets.proto
+++ b/pkg/proto/websockets/v1beta/websockets.proto
@@ -10,6 +10,12 @@ message Message {
bytes payload = 4;
}
-message Messages {
+// RPC request with messages
+message Request {
repeated Message messages = 1;
}
+
+// RPC response (false in case of error)
+message Response {
+ bool ok = 1;
+}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index ba873513..253b9d33 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -162,7 +162,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
return val, nil
}
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("boltdb_driver_mget")
// defense
if keys == nil {
@@ -177,7 +177,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
err := d.DB.View(func(tx *bolt.Tx) error {
b := tx.Bucket(d.bucket)
@@ -186,7 +186,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
buf := new(bytes.Buffer)
- var out string
+ var out []byte
buf.Grow(100)
for i := range keys {
value := b.Get([]byte(keys[i]))
@@ -200,7 +200,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
m[keys[i]] = out
buf.Reset()
- out = ""
+ out = nil
}
}
@@ -241,8 +241,8 @@ func (d *Driver) Set(items ...*kvv1.Item) error {
// performance note: pass a prepared bytes slice with initial cap
// we can't move buf and gob out of loop, because we need to clear both from data
// but gob will contain (w/o re-init) the past data
- buf := bytes.Buffer{}
- encoder := gob.NewEncoder(&buf)
+ buf := new(bytes.Buffer)
+ encoder := gob.NewEncoder(buf)
if errors.Is(errors.EmptyItem, err) {
return errors.E(op, errors.EmptyItem)
}
@@ -342,7 +342,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("boltdb_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -356,7 +356,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for i := range keys {
if item, ok := d.gc.Load(keys[i]); ok {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 8ea515d0..c1f79cbb 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -97,7 +96,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
// MGet return map with key -- string
// and map value as value -- []byte
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("memcached_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -111,7 +110,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for i := range keys {
// Here also MultiGet
data, err := d.client.Get(keys[i])
@@ -150,7 +149,7 @@ func (d *Driver) Set(items ...*kvv1.Item) error {
memcachedItem := &memcache.Item{
Key: items[i].Key,
// unsafe convert
- Value: utils.AsBytes(items[i].Value),
+ Value: items[i].Value,
Flags: 0,
}
@@ -206,7 +205,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
}
// TTL return time in seconds (int32) for a given keys
-func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(_ ...string) (map[string]string, error) {
const op = errors.Op("memcached_plugin_ttl")
return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239"))
}
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index 3158adee..9b7d7259 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -72,12 +71,12 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(*kvv1.Item).Value), nil
+ return data.(*kvv1.Item).Value, nil
}
return nil, nil
}
-func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("in_memory_plugin_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -91,7 +90,7 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
@@ -160,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error {
return nil
}
-func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (s *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("in_memory_plugin_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -174,7 +173,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index 0aaa6352..66cb8384 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -11,6 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/utils"
)
type Driver struct {
@@ -101,7 +102,7 @@ func (d *Driver) Get(key string) ([]byte, error) {
// MGet loads content of multiple values (some values might be skipped).
// https://redis.io/commands/mget
// Returns slice with the interfaces with values
-func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) MGet(keys ...string) (map[string][]byte, error) {
const op = errors.Op("redis_driver_mget")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -115,7 +116,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string][]byte, len(keys))
for _, k := range keys {
cmd := d.universalClient.Get(context.Background(), k)
@@ -126,7 +127,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
return nil, errors.E(op, cmd.Err())
}
- m[k] = cmd.Val()
+ m[k] = utils.AsBytes(cmd.Val())
}
return m, nil
@@ -213,7 +214,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error {
// TTL https://redis.io/commands/ttl
// return time in seconds (float64) for a given keys
-func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
+func (d *Driver) TTL(keys ...string) (map[string]string, error) {
const op = errors.Op("redis_driver_ttl")
if keys == nil {
return nil, errors.E(op, errors.NoKeys)
@@ -227,7 +228,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
}
}
- m := make(map[string]interface{}, len(keys))
+ m := make(map[string]string, len(keys))
for _, key := range keys {
duration, err := d.universalClient.TTL(context.Background(), key).Result()
@@ -235,7 +236,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) {
return nil, err
}
- m[key] = duration.Seconds()
+ m[key] = duration.String()
}
return m, nil
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 7841f9a2..744c6b51 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -12,7 +12,7 @@ type Storage interface {
// MGet loads content of multiple values
// Returns the map with existing keys and associated values
- MGet(keys ...string) (map[string]interface{}, error)
+ MGet(keys ...string) (map[string][]byte, error)
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
@@ -23,7 +23,7 @@ type Storage interface {
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
- TTL(keys ...string) (map[string]interface{}, error)
+ TTL(keys ...string) (map[string]string, error)
// Delete one or multiple keys.
Delete(keys ...string) error
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 557d3ee1..ab1f7f31 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -17,7 +17,7 @@ type rpc struct {
}
// Has accept []*kvv1.Payload proto payload with Storage and Item
-func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
+func (r *rpc) Has(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_has")
if in.GetStorage() == "" {
@@ -38,7 +38,12 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
// update the value in the pointer
// save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ })
+ }
return nil
}
@@ -46,7 +51,7 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
}
// Set accept proto payload with Storage and Item
-func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) Set(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rpc_set")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -56,16 +61,14 @@ func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
}
// save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
// MGet accept proto payload with Storage and Item
-func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
+func (r *rpc) MGet(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_mget")
keys := make([]string, 0, len(in.GetItems()))
@@ -80,8 +83,13 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
return errors.E(op, err)
}
- // save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ Value: ret[k],
+ })
+ }
return nil
}
@@ -89,7 +97,7 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
}
// MExpire accept proto payload with Storage and Item
-func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) MExpire(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rpc_mexpire")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -98,17 +106,14 @@ func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
return errors.E(op, err)
}
- // save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
// TTL accept proto payload with Storage and Item
-func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
+func (r *rpc) TTL(in *kvv1.Request, out *kvv1.Response) error {
const op = errors.Op("rpc_ttl")
keys := make([]string, 0, len(in.GetItems()))
@@ -122,8 +127,14 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
return errors.E(op, err)
}
- // save the result
- *res = ret
+ out.Items = make([]*kvv1.Item, 0, len(ret))
+ for k := range ret {
+ out.Items = append(out.Items, &kvv1.Item{
+ Key: k,
+ Timeout: ret[k],
+ })
+ }
+
return nil
}
@@ -131,7 +142,7 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
}
// Delete accept proto payload with Storage and Item
-func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
+func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error {
const op = errors.Op("rcp_delete")
keys := make([]string, 0, len(in.GetItems()))
@@ -145,11 +156,8 @@ func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
return errors.E(op, err)
}
- // save the result
- *ok = true
return nil
}
- *ok = false
return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 00c1dd91..80697fa2 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -15,12 +15,12 @@ type rpc struct {
// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("broadcast_publish")
// just return in case of nil message
if in == nil {
- *ok = true
+ out.Ok = false
return nil
}
@@ -36,23 +36,23 @@ func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
err = r.plugin.Publish(bb)
if err != nil {
- *ok = false
+ out.Ok = false
return errors.E(op, err)
}
}
- *ok = true
+ out.Ok = false
return nil
}
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("publish_async")
// just return in case of nil message
if in == nil {
- *ok = true
+ out.Ok = false
return nil
}
@@ -69,6 +69,6 @@ func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
r.plugin.PublishAsync(bb)
}
- *ok = true
+ out.Ok = false
return nil
}
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 760b6951..fd8a58cf 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -107,21 +107,19 @@ func kvSetTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
- p := &payload.Payload{
+ p := &payload.Request{
Storage: "boltdb-south",
Items: []*payload.Item{
{
Key: "key",
- Value: "val",
+ Value: []byte("val"),
},
},
}
- var ok bool
-
- err = client.Call("kv.Set", p, &ok)
+ resp := &payload.Response{}
+ err = client.Call("kv.Set", p, resp)
assert.NoError(t, err)
- assert.True(t, ok, "Set return result")
}
func kvHasTest(t *testing.T) {
@@ -129,20 +127,20 @@ func kvHasTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
- p := &payload.Payload{
+ p := &payload.Request{
Storage: "boltdb-south",
Items: []*payload.Item{
{
Key: "key",
- Value: "val",
+ Value: []byte("val"),
},
},
}
- var ret map[string]bool
-
- err = client.Call("kv.Has", p, &ret)
+ ret := &payload.Response{}
+ err = client.Call("kv.Has", p, ret)
assert.NoError(t, err)
+ assert.Len(t, ret.GetItems(), 1)
}
func TestBoltDb(t *testing.T) {
@@ -221,8 +219,7 @@ func testRPCMethods(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
-
- keys := &payload.Payload{
+ keys := &payload.Request{
Storage: "boltdb-rr",
Items: []*payload.Item{
{
@@ -237,63 +234,59 @@ func testRPCMethods(t *testing.T) {
},
}
- data := &payload.Payload{
+ data := &payload.Request{
Storage: "boltdb-rr",
Items: []*payload.Item{
{
Key: "a",
- Value: "aa",
+ Value: []byte("aa"),
},
{
Key: "b",
- Value: "bb",
+ Value: []byte("bb"),
},
{
Key: "c",
- Value: "cc",
+ Value: []byte("cc"),
Timeout: tt,
},
{
Key: "d",
- Value: "dd",
+ Value: []byte("dd"),
},
{
Key: "e",
- Value: "ee",
+ Value: []byte("ee"),
},
},
}
- var setRes bool
-
+ ret := &payload.Response{}
// Register 3 keys with values
- err = client.Call("kv.Set", data, &setRes)
+ err = client.Call("kv.Set", data, ret)
assert.NoError(t, err)
- assert.True(t, setRes)
- ret := make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 3) // should be 3
+ assert.Len(t, ret.GetItems(), 3) // should be 3
// key "c" should be deleted
time.Sleep(time.Second * 7)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 2) // should be 2
+ assert.Len(t, ret.GetItems(), 2) // should be 2
- mGet := make(map[string]interface{})
- err = client.Call("kv.MGet", keys, &mGet)
+ ret = &payload.Response{}
+ err = client.Call("kv.MGet", keys, ret)
assert.NoError(t, err)
- assert.Len(t, mGet, 2) // c is expired
- assert.Equal(t, "aa", mGet["a"].(string))
- assert.Equal(t, "bb", mGet["b"].(string))
+ assert.Len(t, ret.GetItems(), 2) // c is expired
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := &payload.Payload{
+ data2 := &payload.Request{
Storage: "boltdb-rr",
Items: []*payload.Item{
{
@@ -312,13 +305,12 @@ func testRPCMethods(t *testing.T) {
}
// MEXPIRE
- var mExpRes bool
- err = client.Call("kv.MExpire", data2, &mExpRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.MExpire", data2, ret)
assert.NoError(t, err)
- assert.True(t, mExpRes)
// TTL
- keys2 := &payload.Payload{
+ keys2 := &payload.Request{
Storage: "boltdb-rr",
Items: []*payload.Item{
{
@@ -333,20 +325,20 @@ func testRPCMethods(t *testing.T) {
},
}
- ttlRes := make(map[string]interface{})
- err = client.Call("kv.TTL", keys2, &ttlRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.TTL", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ttlRes, 3)
+ assert.Len(t, ret.GetItems(), 3)
// HAS AFTER TTL
time.Sleep(time.Second * 15)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys2, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
// DELETE
- keysDel := &payload.Payload{
+ keysDel := &payload.Request{
Storage: "boltdb-rr",
Items: []*payload.Item{
{
@@ -355,16 +347,15 @@ func testRPCMethods(t *testing.T) {
},
}
- var delRet bool
- err = client.Call("kv.Delete", keysDel, &delRet)
+ ret = &payload.Response{}
+ err = client.Call("kv.Delete", keysDel, ret)
assert.NoError(t, err)
- assert.True(t, delRet)
// HAS AFTER DELETE
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keysDel, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
}
func TestMemcached(t *testing.T) {
@@ -429,7 +420,7 @@ func TestMemcached(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("testMemcachedRPCMethods", testRPCMethodsMemcached)
+ t.Run("MEMCACHED", testRPCMethodsMemcached)
stopCh <- struct{}{}
wg.Wait()
}
@@ -442,7 +433,7 @@ func testRPCMethodsMemcached(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := &payload.Payload{
+ keys := &payload.Request{
Storage: "memcached-rr",
Items: []*payload.Item{
{
@@ -457,63 +448,59 @@ func testRPCMethodsMemcached(t *testing.T) {
},
}
- data := &payload.Payload{
+ data := &payload.Request{
Storage: "memcached-rr",
Items: []*payload.Item{
{
Key: "a",
- Value: "aa",
+ Value: []byte("aa"),
},
{
Key: "b",
- Value: "bb",
+ Value: []byte("bb"),
},
{
Key: "c",
- Value: "cc",
+ Value: []byte("cc"),
Timeout: tt,
},
{
Key: "d",
- Value: "dd",
+ Value: []byte("dd"),
},
{
Key: "e",
- Value: "ee",
+ Value: []byte("ee"),
},
},
}
- var setRes bool
-
+ ret := &payload.Response{}
// Register 3 keys with values
- err = client.Call("kv.Set", data, &setRes)
+ err = client.Call("kv.Set", data, ret)
assert.NoError(t, err)
- assert.True(t, setRes)
- ret := make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 3) // should be 3
+ assert.Len(t, ret.GetItems(), 3) // should be 3
// key "c" should be deleted
time.Sleep(time.Second * 7)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 2) // should be 2
+ assert.Len(t, ret.GetItems(), 2) // should be 2
- mGet := make(map[string]interface{})
- err = client.Call("kv.MGet", keys, &mGet)
+ ret = &payload.Response{}
+ err = client.Call("kv.MGet", keys, ret)
assert.NoError(t, err)
- assert.Len(t, mGet, 2) // c is expired
- assert.Equal(t, string("aa"), string(mGet["a"].([]byte)))
- assert.Equal(t, string("bb"), string(mGet["b"].([]byte)))
+ assert.Len(t, ret.GetItems(), 2) // c is expired
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := &payload.Payload{
+ data2 := &payload.Request{
Storage: "memcached-rr",
Items: []*payload.Item{
{
@@ -532,13 +519,12 @@ func testRPCMethodsMemcached(t *testing.T) {
}
// MEXPIRE
- var mExpRes bool
- err = client.Call("kv.MExpire", data2, &mExpRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.MExpire", data2, ret)
assert.NoError(t, err)
- assert.True(t, mExpRes)
// TTL call is not supported for the memcached driver
- keys2 := &payload.Payload{
+ keys2 := &payload.Request{
Storage: "memcached-rr",
Items: []*payload.Item{
{
@@ -553,20 +539,20 @@ func testRPCMethodsMemcached(t *testing.T) {
},
}
- ttlRes := make(map[string]interface{})
- err = client.Call("kv.TTL", keys2, &ttlRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.TTL", keys2, ret)
assert.Error(t, err)
- assert.Len(t, ttlRes, 0)
+ assert.Len(t, ret.GetItems(), 0)
// HAS AFTER TTL
time.Sleep(time.Second * 15)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys2, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
// DELETE
- keysDel := &payload.Payload{
+ keysDel := &payload.Request{
Storage: "memcached-rr",
Items: []*payload.Item{
{
@@ -575,16 +561,15 @@ func testRPCMethodsMemcached(t *testing.T) {
},
}
- var delRet bool
- err = client.Call("kv.Delete", keysDel, &delRet)
+ ret = &payload.Response{}
+ err = client.Call("kv.Delete", keysDel, ret)
assert.NoError(t, err)
- assert.True(t, delRet)
// HAS AFTER DELETE
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keysDel, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
}
func TestInMemory(t *testing.T) {
@@ -649,7 +634,7 @@ func TestInMemory(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("testInMemoryRPCMethods", testRPCMethodsInMemory)
+ t.Run("INMEMORY", testRPCMethodsInMemory)
stopCh <- struct{}{}
wg.Wait()
}
@@ -660,8 +645,9 @@ func testRPCMethodsInMemory(t *testing.T) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// add 5 second ttl
+
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := &payload.Payload{
+ keys := &payload.Request{
Storage: "memory-rr",
Items: []*payload.Item{
{
@@ -676,63 +662,59 @@ func testRPCMethodsInMemory(t *testing.T) {
},
}
- data := &payload.Payload{
+ data := &payload.Request{
Storage: "memory-rr",
Items: []*payload.Item{
{
Key: "a",
- Value: "aa",
+ Value: []byte("aa"),
},
{
Key: "b",
- Value: "bb",
+ Value: []byte("bb"),
},
{
Key: "c",
- Value: "cc",
+ Value: []byte("cc"),
Timeout: tt,
},
{
Key: "d",
- Value: "dd",
+ Value: []byte("dd"),
},
{
Key: "e",
- Value: "ee",
+ Value: []byte("ee"),
},
},
}
- var setRes bool
-
+ ret := &payload.Response{}
// Register 3 keys with values
- err = client.Call("kv.Set", data, &setRes)
+ err = client.Call("kv.Set", data, ret)
assert.NoError(t, err)
- assert.True(t, setRes)
- ret := make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 3) // should be 3
+ assert.Len(t, ret.GetItems(), 3) // should be 3
// key "c" should be deleted
time.Sleep(time.Second * 7)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 2) // should be 2
+ assert.Len(t, ret.GetItems(), 2) // should be 2
- mGet := make(map[string]interface{})
- err = client.Call("kv.MGet", keys, &mGet)
+ ret = &payload.Response{}
+ err = client.Call("kv.MGet", keys, ret)
assert.NoError(t, err)
- assert.Len(t, mGet, 2) // c is expired
- assert.Equal(t, "aa", mGet["a"].(string))
- assert.Equal(t, "bb", mGet["b"].(string))
+ assert.Len(t, ret.GetItems(), 2) // c is expired
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := &payload.Payload{
+ data2 := &payload.Request{
Storage: "memory-rr",
Items: []*payload.Item{
{
@@ -751,13 +733,12 @@ func testRPCMethodsInMemory(t *testing.T) {
}
// MEXPIRE
- var mExpRes bool
- err = client.Call("kv.MExpire", data2, &mExpRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.MExpire", data2, ret)
assert.NoError(t, err)
- assert.True(t, mExpRes)
// TTL
- keys2 := &payload.Payload{
+ keys2 := &payload.Request{
Storage: "memory-rr",
Items: []*payload.Item{
{
@@ -772,20 +753,20 @@ func testRPCMethodsInMemory(t *testing.T) {
},
}
- ttlRes := make(map[string]interface{})
- err = client.Call("kv.TTL", keys2, &ttlRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.TTL", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ttlRes, 3)
+ assert.Len(t, ret.GetItems(), 3)
// HAS AFTER TTL
time.Sleep(time.Second * 15)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys2, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
// DELETE
- keysDel := &payload.Payload{
+ keysDel := &payload.Request{
Storage: "memory-rr",
Items: []*payload.Item{
{
@@ -794,16 +775,15 @@ func testRPCMethodsInMemory(t *testing.T) {
},
}
- var delRet bool
- err = client.Call("kv.Delete", keysDel, &delRet)
+ ret = &payload.Response{}
+ err = client.Call("kv.Delete", keysDel, ret)
assert.NoError(t, err)
- assert.True(t, delRet)
// HAS AFTER DELETE
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keysDel, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
}
func TestRedis(t *testing.T) {
@@ -868,7 +848,7 @@ func TestRedis(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("testRedisRPCMethods", testRPCMethodsRedis)
+ t.Run("REDIS", testRPCMethodsRedis)
stopCh <- struct{}{}
wg.Wait()
}
@@ -880,7 +860,7 @@ func testRPCMethodsRedis(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := &payload.Payload{
+ keys := &payload.Request{
Storage: "redis-rr",
Items: []*payload.Item{
{
@@ -895,62 +875,59 @@ func testRPCMethodsRedis(t *testing.T) {
},
}
- data := &payload.Payload{
+ data := &payload.Request{
Storage: "redis-rr",
Items: []*payload.Item{
{
Key: "a",
- Value: "aa",
+ Value: []byte("aa"),
},
{
Key: "b",
- Value: "bb",
+ Value: []byte("bb"),
},
{
Key: "c",
- Value: "cc",
+ Value: []byte("cc"),
Timeout: tt,
},
{
Key: "d",
- Value: "dd",
+ Value: []byte("dd"),
},
{
Key: "e",
- Value: "ee",
+ Value: []byte("ee"),
},
},
}
- var setRes bool
-
+ ret := &payload.Response{}
// Register 3 keys with values
- err = client.Call("kv.Set", data, &setRes)
+ err = client.Call("kv.Set", data, ret)
assert.NoError(t, err)
- assert.True(t, setRes)
- ret := make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 3) // should be 3
+ assert.Len(t, ret.GetItems(), 3) // should be 3
// key "c" should be deleted
time.Sleep(time.Second * 7)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 2) // should be 2
+ assert.Len(t, ret.GetItems(), 2) // should be 2
- mGet := make(map[string]interface{})
- err = client.Call("kv.MGet", keys, &mGet)
+ ret = &payload.Response{}
+ err = client.Call("kv.MGet", keys, ret)
assert.NoError(t, err)
- assert.Len(t, mGet, 2) // c is expired
- assert.Equal(t, "aa", mGet["a"].(string))
- assert.Equal(t, "bb", mGet["b"].(string))
+ assert.Len(t, ret.GetItems(), 2) // c is expired
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := &payload.Payload{
+
+ data2 := &payload.Request{
Storage: "redis-rr",
Items: []*payload.Item{
{
@@ -969,13 +946,12 @@ func testRPCMethodsRedis(t *testing.T) {
}
// MEXPIRE
- var mExpRes bool
- err = client.Call("kv.MExpire", data2, &mExpRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.MExpire", data2, ret)
assert.NoError(t, err)
- assert.True(t, mExpRes)
// TTL
- keys2 := &payload.Payload{
+ keys2 := &payload.Request{
Storage: "redis-rr",
Items: []*payload.Item{
{
@@ -990,20 +966,20 @@ func testRPCMethodsRedis(t *testing.T) {
},
}
- ttlRes := make(map[string]interface{})
- err = client.Call("kv.TTL", keys2, &ttlRes)
+ ret = &payload.Response{}
+ err = client.Call("kv.TTL", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ttlRes, 3)
+ assert.Len(t, ret.GetItems(), 3)
// HAS AFTER TTL
time.Sleep(time.Second * 15)
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keys2, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keys2, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
// DELETE
- keysDel := &payload.Payload{
+ keysDel := &payload.Request{
Storage: "redis-rr",
Items: []*payload.Item{
{
@@ -1012,14 +988,13 @@ func testRPCMethodsRedis(t *testing.T) {
},
}
- var delRet bool
- err = client.Call("kv.Delete", keysDel, &delRet)
+ ret = &payload.Response{}
+ err = client.Call("kv.Delete", keysDel, ret)
assert.NoError(t, err)
- assert.True(t, delRet)
// HAS AFTER DELETE
- ret = make(map[string]bool)
- err = client.Call("kv.Has", keysDel, &ret)
+ ret = &payload.Response{}
+ err = client.Call("kv.Has", keysDel, ret)
assert.NoError(t, err)
- assert.Len(t, ret, 0)
+ assert.Len(t, ret.GetItems(), 0)
}
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 4e4c09f1..c1fd5f3e 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -28,20 +28,6 @@ import (
"github.com/stretchr/testify/assert"
)
-type Msg struct {
- // Topic makeMessage been pushed into.
- Topics []string `json:"topic"`
-
- // Command (join, leave, headers)
- Command string `json:"command"`
-
- // Broker (redis, memory)
- Broker string `json:"broker"`
-
- // Payload to be broadcasted
- Payload []byte `json:"payload"`
-}
-
func TestBroadcastInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
@@ -819,8 +805,8 @@ func publish(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- var ret bool
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
@@ -834,8 +820,8 @@ func publishAsync(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- var ret bool
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret)
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
if err != nil {
panic(err)
}
@@ -849,8 +835,8 @@ func publishAsync2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- var ret bool
- err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
if err != nil {
panic(err)
}
@@ -864,8 +850,8 @@ func publish2(command string, broker string, topics ...string) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- var ret bool
- err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret)
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
if err != nil {
panic(err)
}
@@ -879,8 +865,8 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages {
- m := &websocketsv1.Messages{
+func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request {
+ m := &websocketsv1.Request{
Messages: []*websocketsv1.Message{
{
Topics: topics,