diff options
author | Valery Piashchynski <[email protected]> | 2021-06-09 20:26:18 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-09 20:26:18 +0300 |
commit | b99bfbe21a0f44b1a16b9110d779719fc637127c (patch) | |
tree | 3aabdb96c86a59325d816ad64cabc967ef2c8f10 | |
parent | 8fdf05d4f360a9f6344141b273eab9d6859470e0 (diff) | |
parent | 7665167623147403d575b7e2cf125073cbe6584d (diff) |
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC callsv2.3.0-beta.3
#715 feat(protocol): use protobuf for the `kv` and `websockets` RPC calls
-rw-r--r-- | CHANGELOG.md | 15 | ||||
-rw-r--r-- | go.mod | 4 | ||||
-rw-r--r-- | go.sum | 14 | ||||
-rw-r--r-- | pkg/proto/kv/v1beta/kv.pb.go | 123 | ||||
-rw-r--r-- | pkg/proto/kv/v1beta/kv.proto | 9 | ||||
-rw-r--r-- | pkg/proto/websockets/v1beta/websockets.pb.go | 104 | ||||
-rw-r--r-- | pkg/proto/websockets/v1beta/websockets.proto | 8 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 16 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 9 | ||||
-rw-r--r-- | plugins/kv/drivers/memory/driver.go | 11 | ||||
-rw-r--r-- | plugins/kv/drivers/redis/driver.go | 13 | ||||
-rw-r--r-- | plugins/kv/interface.go | 4 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 46 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 14 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 325 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 34 |
16 files changed, 429 insertions, 320 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index fa193010..38290d70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ v2.3.0 (08.06.2021) - ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus on 2CPU cores and 1GB of RAM) [Issue](https://github.com/spiral/roadrunner/issues/513) -- ✏️ Protobuf binary messages for the `websockets` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711) +- ✏️ Protobuf binary messages for the `websockets` and `kv` RPC calls under the hood. [Issue](https://github.com/spiral/roadrunner/issues/711) - ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) - ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) - ✏️ Add new option to the `http` config section: `internal_error_code` to override default (500) internal error code. [Issue](https://github.com/spiral/roadrunner/issues/659) @@ -20,12 +20,13 @@ v2.3.0 (08.06.2021) ## 🩹 Fixes: - 🐛 Fix: Bug with `informer.Workers` worked incorrectly: [Bug](https://github.com/spiral/roadrunner/issues/686) -- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in - logs: [Bug](https://github.com/spiral/roadrunner/issues/659) -- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` - error: [Bug](https://github.com/spiral/roadrunner/issues/691) -- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the - NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) +- 🐛 Fix: Internal error messages will not be shown to the user (except HTTP status code). Error message will be in logs: [Bug](https://github.com/spiral/roadrunner/issues/659) +- 🐛 Fix: Error message will be properly shown in the log in case of `SoftJob` error: [Bug](https://github.com/spiral/roadrunner/issues/691) +- 🐛 Fix: Wrong applied middlewares for the `fcgi` server leads to the NPE: [Bug](https://github.com/spiral/roadrunner/issues/701) + +## 📦 Packages: + +- 📦 Update goridge to `v3.1.0` --- @@ -22,7 +22,7 @@ require ( // SPIRAL ==== github.com/spiral/endure v1.0.1 github.com/spiral/errors v1.0.11 - github.com/spiral/goridge/v3 v3.0.1 + github.com/spiral/goridge/v3 v3.1.0 // =========== github.com/stretchr/testify v1.7.0 github.com/tklauser/go-sysconf v0.3.4 // indirect @@ -35,6 +35,6 @@ require ( golang.org/x/net v0.0.0-20210226101413-39120d07d75e golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 - google.golang.org/protobuf v1.23.0 + google.golang.org/protobuf v1.26.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -142,8 +142,9 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -385,12 +386,12 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spiral/endure v1.0.1 h1:JHXHHPDiet5Cfx8i2KiC+ayqACmK5Sw0fxNE/QpIuWM= github.com/spiral/endure v1.0.1/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= -github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.10/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/errors v1.0.11 h1:TGG+t3mNouLuRW54Ph7nHo4X3u4WhbxqEQmnIybi7Go= github.com/spiral/errors v1.0.11/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= -github.com/spiral/goridge/v3 v3.0.1 h1:mWo6hVEDJV3nRwsszx9y262CtrLQNojbONF4ikvKCBg= -github.com/spiral/goridge/v3 v3.0.1/go.mod h1:rYfsBwigGneLgYJTIh5urotnH63I5O+p6ZcVq7xc1lY= +github.com/spiral/goridge/v3 v3.1.0 h1:zbNCQaLj+QyM1e/uQP2RHbiZSlxJJvuLAE7GGA3EeYo= +github.com/spiral/goridge/v3 v3.1.0/go.mod h1:yL47qBzJ1ic89PpfROyTnLjR9XplaEmvXhnJSrooeGw= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= @@ -399,7 +400,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= @@ -657,8 +657,10 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/proto/kv/v1beta/kv.pb.go b/pkg/proto/kv/v1beta/kv.pb.go index 76450869..622967b8 100644 --- a/pkg/proto/kv/v1beta/kv.pb.go +++ b/pkg/proto/kv/v1beta/kv.pb.go @@ -20,7 +20,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type Payload struct { +type Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -30,8 +30,8 @@ type Payload struct { Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` } -func (x *Payload) Reset() { - *x = Payload{} +func (x *Request) Reset() { + *x = Request{} if protoimpl.UnsafeEnabled { mi := &file_kv_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -39,13 +39,13 @@ func (x *Payload) Reset() { } } -func (x *Payload) String() string { +func (x *Request) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Payload) ProtoMessage() {} +func (*Request) ProtoMessage() {} -func (x *Payload) ProtoReflect() protoreflect.Message { +func (x *Request) ProtoReflect() protoreflect.Message { mi := &file_kv_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -57,19 +57,19 @@ func (x *Payload) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Payload.ProtoReflect.Descriptor instead. -func (*Payload) Descriptor() ([]byte, []int) { +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { return file_kv_proto_rawDescGZIP(), []int{0} } -func (x *Payload) GetStorage() string { +func (x *Request) GetStorage() string { if x != nil { return x.Storage } return "" } -func (x *Payload) GetItems() []*Item { +func (x *Request) GetItems() []*Item { if x != nil { return x.Items } @@ -82,7 +82,7 @@ type Item struct { unknownFields protoimpl.UnknownFields Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // RFC 3339 Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` } @@ -126,11 +126,11 @@ func (x *Item) GetKey() string { return "" } -func (x *Item) GetValue() string { +func (x *Item) GetValue() []byte { if x != nil { return x.Value } - return "" + return nil } func (x *Item) GetTimeout() string { @@ -140,22 +140,73 @@ func (x *Item) GetTimeout() string { return "" } +// KV response for the KV RPC methods +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Items []*Item `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_kv_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_kv_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetItems() []*Item { + if x != nil { + return x.Items + } + return nil +} + var File_kv_proto protoreflect.FileDescriptor var file_kv_proto_rawDesc = []byte{ 0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76, - 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, - 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x31, 0x0a, 0x08, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, + 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76, 0x31, 0x62, 0x65, + 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x42, 0x0d, + 0x5a, 0x0b, 0x2e, 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -170,18 +221,20 @@ func file_kv_proto_rawDescGZIP() []byte { return file_kv_proto_rawDescData } -var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_kv_proto_goTypes = []interface{}{ - (*Payload)(nil), // 0: kv.v1beta.Payload - (*Item)(nil), // 1: kv.v1beta.Item + (*Request)(nil), // 0: kv.v1beta.Request + (*Item)(nil), // 1: kv.v1beta.Item + (*Response)(nil), // 2: kv.v1beta.Response } var file_kv_proto_depIdxs = []int32{ - 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 1, // 0: kv.v1beta.Request.items:type_name -> kv.v1beta.Item + 1, // 1: kv.v1beta.Response.items:type_name -> kv.v1beta.Item + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_kv_proto_init() } @@ -191,7 +244,7 @@ func file_kv_proto_init() { } if !protoimpl.UnsafeEnabled { file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Payload); i { + switch v := v.(*Request); i { case 0: return &v.state case 1: @@ -214,6 +267,18 @@ func file_kv_proto_init() { return nil } } + file_kv_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -221,7 +286,7 @@ func file_kv_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_kv_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/proto/kv/v1beta/kv.proto b/pkg/proto/kv/v1beta/kv.proto index 1ec0e6b7..1e3b8177 100644 --- a/pkg/proto/kv/v1beta/kv.proto +++ b/pkg/proto/kv/v1beta/kv.proto @@ -3,7 +3,7 @@ syntax = "proto3"; package kv.v1beta; option go_package = "./;kvv1beta"; -message Payload { +message Request { // could be an enum in the future string storage = 1; repeated Item items = 2; @@ -11,7 +11,12 @@ message Payload { message Item { string key = 1; - string value = 2; + bytes value = 2; // RFC 3339 string timeout = 3; } + +// KV response for the KV RPC methods +message Response { + repeated Item items = 1; +} diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go index f04c3cb2..d39b55da 100644 --- a/pkg/proto/websockets/v1beta/websockets.pb.go +++ b/pkg/proto/websockets/v1beta/websockets.pb.go @@ -91,7 +91,7 @@ func (x *Message) GetPayload() []byte { return nil } -type Messages struct { +type Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -99,8 +99,8 @@ type Messages struct { Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` } -func (x *Messages) Reset() { - *x = Messages{} +func (x *Request) Reset() { + *x = Request{} if protoimpl.UnsafeEnabled { mi := &file_websockets_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -108,13 +108,13 @@ func (x *Messages) Reset() { } } -func (x *Messages) String() string { +func (x *Request) String() string { return protoimpl.X.MessageStringOf(x) } -func (*Messages) ProtoMessage() {} +func (*Request) ProtoMessage() {} -func (x *Messages) ProtoReflect() protoreflect.Message { +func (x *Request) ProtoReflect() protoreflect.Message { mi := &file_websockets_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -126,18 +126,65 @@ func (x *Messages) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use Messages.ProtoReflect.Descriptor instead. -func (*Messages) Descriptor() ([]byte, []int) { +// Deprecated: Use Request.ProtoReflect.Descriptor instead. +func (*Request) Descriptor() ([]byte, []int) { return file_websockets_proto_rawDescGZIP(), []int{1} } -func (x *Messages) GetMessages() []*Message { +func (x *Request) GetMessages() []*Message { if x != nil { return x.Messages } return nil } +type Response struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ok bool `protobuf:"varint,1,opt,name=ok,proto3" json:"ok,omitempty"` +} + +func (x *Response) Reset() { + *x = Response{} + if protoimpl.UnsafeEnabled { + mi := &file_websockets_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Response) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Response) ProtoMessage() {} + +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_websockets_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_websockets_proto_rawDescGZIP(), []int{2} +} + +func (x *Response) GetOk() bool { + if x != nil { + return x.Ok + } + return false +} + var File_websockets_proto protoreflect.FileDescriptor var file_websockets_proto_rawDesc = []byte{ @@ -150,13 +197,15 @@ var file_websockets_proto_rawDesc = []byte{ 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, - 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, - 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, - 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, - 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x41, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x1a, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x02, 0x6f, 0x6b, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, + 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -171,13 +220,14 @@ func file_websockets_proto_rawDescGZIP() []byte { return file_websockets_proto_rawDescData } -var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_websockets_proto_goTypes = []interface{}{ (*Message)(nil), // 0: websockets.v1beta.Message - (*Messages)(nil), // 1: websockets.v1beta.Messages + (*Request)(nil), // 1: websockets.v1beta.Request + (*Response)(nil), // 2: websockets.v1beta.Response } var file_websockets_proto_depIdxs = []int32{ - 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message + 0, // 0: websockets.v1beta.Request.messages:type_name -> websockets.v1beta.Message 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name @@ -204,7 +254,19 @@ func file_websockets_proto_init() { } } file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Messages); i { + switch v := v.(*Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_websockets_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { case 0: return &v.state case 1: @@ -222,7 +284,7 @@ func file_websockets_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_websockets_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto index a61da93d..ede3cde9 100644 --- a/pkg/proto/websockets/v1beta/websockets.proto +++ b/pkg/proto/websockets/v1beta/websockets.proto @@ -10,6 +10,12 @@ message Message { bytes payload = 4; } -message Messages { +// RPC request with messages +message Request { repeated Message messages = 1; } + +// RPC response (false in case of error) +message Response { + bool ok = 1; +} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index ba873513..253b9d33 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -162,7 +162,7 @@ func (d *Driver) Get(key string) ([]byte, error) { return val, nil } -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("boltdb_driver_mget") // defense if keys == nil { @@ -177,7 +177,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) err := d.DB.View(func(tx *bolt.Tx) error { b := tx.Bucket(d.bucket) @@ -186,7 +186,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } buf := new(bytes.Buffer) - var out string + var out []byte buf.Grow(100) for i := range keys { value := b.Get([]byte(keys[i])) @@ -200,7 +200,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } m[keys[i]] = out buf.Reset() - out = "" + out = nil } } @@ -241,8 +241,8 @@ func (d *Driver) Set(items ...*kvv1.Item) error { // performance note: pass a prepared bytes slice with initial cap // we can't move buf and gob out of loop, because we need to clear both from data // but gob will contain (w/o re-init) the past data - buf := bytes.Buffer{} - encoder := gob.NewEncoder(&buf) + buf := new(bytes.Buffer) + encoder := gob.NewEncoder(buf) if errors.Is(errors.EmptyItem, err) { return errors.E(op, errors.EmptyItem) } @@ -342,7 +342,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("boltdb_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -356,7 +356,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for i := range keys { if item, ok := d.gc.Load(keys[i]); ok { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 8ea515d0..c1f79cbb 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -97,7 +96,7 @@ func (d *Driver) Get(key string) ([]byte, error) { // MGet return map with key -- string // and map value as value -- []byte -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("memcached_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -111,7 +110,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for i := range keys { // Here also MultiGet data, err := d.client.Get(keys[i]) @@ -150,7 +149,7 @@ func (d *Driver) Set(items ...*kvv1.Item) error { memcachedItem := &memcache.Item{ Key: items[i].Key, // unsafe convert - Value: utils.AsBytes(items[i].Value), + Value: items[i].Value, Flags: 0, } @@ -206,7 +205,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { } // TTL return time in seconds (int32) for a given keys -func (d *Driver) TTL(_ ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(_ ...string) (map[string]string, error) { const op = errors.Op("memcached_plugin_ttl") return nil, errors.E(op, errors.Str("not valid request for memcached, see https://github.com/memcached/memcached/issues/239")) } diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index 3158adee..9b7d7259 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -72,12 +71,12 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(*kvv1.Item).Value), nil + return data.(*kvv1.Item).Value, nil } return nil, nil } -func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (s *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("in_memory_plugin_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -91,7 +90,7 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { @@ -160,7 +159,7 @@ func (s *Driver) MExpire(items ...*kvv1.Item) error { return nil } -func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (s *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("in_memory_plugin_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -174,7 +173,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index 0aaa6352..66cb8384 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -11,6 +11,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/utils" ) type Driver struct { @@ -101,7 +102,7 @@ func (d *Driver) Get(key string) ([]byte, error) { // MGet loads content of multiple values (some values might be skipped). // https://redis.io/commands/mget // Returns slice with the interfaces with values -func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { +func (d *Driver) MGet(keys ...string) (map[string][]byte, error) { const op = errors.Op("redis_driver_mget") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -115,7 +116,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string][]byte, len(keys)) for _, k := range keys { cmd := d.universalClient.Get(context.Background(), k) @@ -126,7 +127,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { return nil, errors.E(op, cmd.Err()) } - m[k] = cmd.Val() + m[k] = utils.AsBytes(cmd.Val()) } return m, nil @@ -213,7 +214,7 @@ func (d *Driver) MExpire(items ...*kvv1.Item) error { // TTL https://redis.io/commands/ttl // return time in seconds (float64) for a given keys -func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { +func (d *Driver) TTL(keys ...string) (map[string]string, error) { const op = errors.Op("redis_driver_ttl") if keys == nil { return nil, errors.E(op, errors.NoKeys) @@ -227,7 +228,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { } } - m := make(map[string]interface{}, len(keys)) + m := make(map[string]string, len(keys)) for _, key := range keys { duration, err := d.universalClient.TTL(context.Background(), key).Result() @@ -235,7 +236,7 @@ func (d *Driver) TTL(keys ...string) (map[string]interface{}, error) { return nil, err } - m[key] = duration.Seconds() + m[key] = duration.String() } return m, nil } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 7841f9a2..744c6b51 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -12,7 +12,7 @@ type Storage interface { // MGet loads content of multiple values // Returns the map with existing keys and associated values - MGet(keys ...string) (map[string]interface{}, error) + MGet(keys ...string) (map[string][]byte, error) // Set used to upload item to KV with TTL // 0 value in TTL means no TTL @@ -23,7 +23,7 @@ type Storage interface { // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb - TTL(keys ...string) (map[string]interface{}, error) + TTL(keys ...string) (map[string]string, error) // Delete one or multiple keys. Delete(keys ...string) error diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 557d3ee1..ab1f7f31 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -17,7 +17,7 @@ type rpc struct { } // Has accept []*kvv1.Payload proto payload with Storage and Item -func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { +func (r *rpc) Has(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_has") if in.GetStorage() == "" { @@ -38,7 +38,12 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { // update the value in the pointer // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + }) + } return nil } @@ -46,7 +51,7 @@ func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { } // Set accept proto payload with Storage and Item -func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { +func (r *rpc) Set(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rpc_set") if st, exists := r.storages[in.GetStorage()]; exists { @@ -56,16 +61,14 @@ func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { } // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } // MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { +func (r *rpc) MGet(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_mget") keys := make([]string, 0, len(in.GetItems())) @@ -80,8 +83,13 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { return errors.E(op, err) } - // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + Value: ret[k], + }) + } return nil } @@ -89,7 +97,7 @@ func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { } // MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { +func (r *rpc) MExpire(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rpc_mexpire") if st, exists := r.storages[in.GetStorage()]; exists { @@ -98,17 +106,14 @@ func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { return errors.E(op, err) } - // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } // TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { +func (r *rpc) TTL(in *kvv1.Request, out *kvv1.Response) error { const op = errors.Op("rpc_ttl") keys := make([]string, 0, len(in.GetItems())) @@ -122,8 +127,14 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { return errors.E(op, err) } - // save the result - *res = ret + out.Items = make([]*kvv1.Item, 0, len(ret)) + for k := range ret { + out.Items = append(out.Items, &kvv1.Item{ + Key: k, + Timeout: ret[k], + }) + } + return nil } @@ -131,7 +142,7 @@ func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { } // Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { +func (r *rpc) Delete(in *kvv1.Request, _ *kvv1.Response) error { const op = errors.Op("rcp_delete") keys := make([]string, 0, len(in.GetItems())) @@ -145,11 +156,8 @@ func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { return errors.E(op, err) } - // save the result - *ok = true return nil } - *ok = false return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 00c1dd91..80697fa2 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -15,12 +15,12 @@ type rpc struct { // Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { const op = errors.Op("broadcast_publish") // just return in case of nil message if in == nil { - *ok = true + out.Ok = false return nil } @@ -36,23 +36,23 @@ func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { err = r.plugin.Publish(bb) if err != nil { - *ok = false + out.Ok = false return errors.E(op, err) } } - *ok = true + out.Ok = false return nil } // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { +func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { const op = errors.Op("publish_async") // just return in case of nil message if in == nil { - *ok = true + out.Ok = false return nil } @@ -69,6 +69,6 @@ func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { r.plugin.PublishAsync(bb) } - *ok = true + out.Ok = false return nil } diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 760b6951..fd8a58cf 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -107,21 +107,19 @@ func kvSetTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ + p := &payload.Request{ Storage: "boltdb-south", Items: []*payload.Item{ { Key: "key", - Value: "val", + Value: []byte("val"), }, }, } - var ok bool - - err = client.Call("kv.Set", p, &ok) + resp := &payload.Response{} + err = client.Call("kv.Set", p, resp) assert.NoError(t, err) - assert.True(t, ok, "Set return result") } func kvHasTest(t *testing.T) { @@ -129,20 +127,20 @@ func kvHasTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ + p := &payload.Request{ Storage: "boltdb-south", Items: []*payload.Item{ { Key: "key", - Value: "val", + Value: []byte("val"), }, }, } - var ret map[string]bool - - err = client.Call("kv.Has", p, &ret) + ret := &payload.Response{} + err = client.Call("kv.Has", p, ret) assert.NoError(t, err) + assert.Len(t, ret.GetItems(), 1) } func TestBoltDb(t *testing.T) { @@ -221,8 +219,7 @@ func testRPCMethods(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - - keys := &payload.Payload{ + keys := &payload.Request{ Storage: "boltdb-rr", Items: []*payload.Item{ { @@ -237,63 +234,59 @@ func testRPCMethods(t *testing.T) { }, } - data := &payload.Payload{ + data := &payload.Request{ Storage: "boltdb-rr", Items: []*payload.Item{ { Key: "a", - Value: "aa", + Value: []byte("aa"), }, { Key: "b", - Value: "bb", + Value: []byte("bb"), }, { Key: "c", - Value: "cc", + Value: []byte("cc"), Timeout: tt, }, { Key: "d", - Value: "dd", + Value: []byte("dd"), }, { Key: "e", - Value: "ee", + Value: []byte("ee"), }, }, } - var setRes bool - + ret := &payload.Response{} // Register 3 keys with values - err = client.Call("kv.Set", data, &setRes) + err = client.Call("kv.Set", data, ret) assert.NoError(t, err) - assert.True(t, setRes) - ret := make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 + assert.Len(t, ret.GetItems(), 3) // should be 3 // key "c" should be deleted time.Sleep(time.Second * 7) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 + assert.Len(t, ret.GetItems(), 2) // should be 2 - mGet := make(map[string]interface{}) - err = client.Call("kv.MGet", keys, &mGet) + ret = &payload.Response{} + err = client.Call("kv.MGet", keys, ret) assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, "aa", mGet["a"].(string)) - assert.Equal(t, "bb", mGet["b"].(string)) + assert.Len(t, ret.GetItems(), 2) // c is expired tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := &payload.Payload{ + data2 := &payload.Request{ Storage: "boltdb-rr", Items: []*payload.Item{ { @@ -312,13 +305,12 @@ func testRPCMethods(t *testing.T) { } // MEXPIRE - var mExpRes bool - err = client.Call("kv.MExpire", data2, &mExpRes) + ret = &payload.Response{} + err = client.Call("kv.MExpire", data2, ret) assert.NoError(t, err) - assert.True(t, mExpRes) // TTL - keys2 := &payload.Payload{ + keys2 := &payload.Request{ Storage: "boltdb-rr", Items: []*payload.Item{ { @@ -333,20 +325,20 @@ func testRPCMethods(t *testing.T) { }, } - ttlRes := make(map[string]interface{}) - err = client.Call("kv.TTL", keys2, &ttlRes) + ret = &payload.Response{} + err = client.Call("kv.TTL", keys2, ret) assert.NoError(t, err) - assert.Len(t, ttlRes, 3) + assert.Len(t, ret.GetItems(), 3) // HAS AFTER TTL time.Sleep(time.Second * 15) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys2, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys2, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) // DELETE - keysDel := &payload.Payload{ + keysDel := &payload.Request{ Storage: "boltdb-rr", Items: []*payload.Item{ { @@ -355,16 +347,15 @@ func testRPCMethods(t *testing.T) { }, } - var delRet bool - err = client.Call("kv.Delete", keysDel, &delRet) + ret = &payload.Response{} + err = client.Call("kv.Delete", keysDel, ret) assert.NoError(t, err) - assert.True(t, delRet) // HAS AFTER DELETE - ret = make(map[string]bool) - err = client.Call("kv.Has", keysDel, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) } func TestMemcached(t *testing.T) { @@ -429,7 +420,7 @@ func TestMemcached(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("testMemcachedRPCMethods", testRPCMethodsMemcached) + t.Run("MEMCACHED", testRPCMethodsMemcached) stopCh <- struct{}{} wg.Wait() } @@ -442,7 +433,7 @@ func testRPCMethodsMemcached(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Payload{ + keys := &payload.Request{ Storage: "memcached-rr", Items: []*payload.Item{ { @@ -457,63 +448,59 @@ func testRPCMethodsMemcached(t *testing.T) { }, } - data := &payload.Payload{ + data := &payload.Request{ Storage: "memcached-rr", Items: []*payload.Item{ { Key: "a", - Value: "aa", + Value: []byte("aa"), }, { Key: "b", - Value: "bb", + Value: []byte("bb"), }, { Key: "c", - Value: "cc", + Value: []byte("cc"), Timeout: tt, }, { Key: "d", - Value: "dd", + Value: []byte("dd"), }, { Key: "e", - Value: "ee", + Value: []byte("ee"), }, }, } - var setRes bool - + ret := &payload.Response{} // Register 3 keys with values - err = client.Call("kv.Set", data, &setRes) + err = client.Call("kv.Set", data, ret) assert.NoError(t, err) - assert.True(t, setRes) - ret := make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 + assert.Len(t, ret.GetItems(), 3) // should be 3 // key "c" should be deleted time.Sleep(time.Second * 7) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 + assert.Len(t, ret.GetItems(), 2) // should be 2 - mGet := make(map[string]interface{}) - err = client.Call("kv.MGet", keys, &mGet) + ret = &payload.Response{} + err = client.Call("kv.MGet", keys, ret) assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, string("aa"), string(mGet["a"].([]byte))) - assert.Equal(t, string("bb"), string(mGet["b"].([]byte))) + assert.Len(t, ret.GetItems(), 2) // c is expired tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := &payload.Payload{ + data2 := &payload.Request{ Storage: "memcached-rr", Items: []*payload.Item{ { @@ -532,13 +519,12 @@ func testRPCMethodsMemcached(t *testing.T) { } // MEXPIRE - var mExpRes bool - err = client.Call("kv.MExpire", data2, &mExpRes) + ret = &payload.Response{} + err = client.Call("kv.MExpire", data2, ret) assert.NoError(t, err) - assert.True(t, mExpRes) // TTL call is not supported for the memcached driver - keys2 := &payload.Payload{ + keys2 := &payload.Request{ Storage: "memcached-rr", Items: []*payload.Item{ { @@ -553,20 +539,20 @@ func testRPCMethodsMemcached(t *testing.T) { }, } - ttlRes := make(map[string]interface{}) - err = client.Call("kv.TTL", keys2, &ttlRes) + ret = &payload.Response{} + err = client.Call("kv.TTL", keys2, ret) assert.Error(t, err) - assert.Len(t, ttlRes, 0) + assert.Len(t, ret.GetItems(), 0) // HAS AFTER TTL time.Sleep(time.Second * 15) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys2, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys2, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) // DELETE - keysDel := &payload.Payload{ + keysDel := &payload.Request{ Storage: "memcached-rr", Items: []*payload.Item{ { @@ -575,16 +561,15 @@ func testRPCMethodsMemcached(t *testing.T) { }, } - var delRet bool - err = client.Call("kv.Delete", keysDel, &delRet) + ret = &payload.Response{} + err = client.Call("kv.Delete", keysDel, ret) assert.NoError(t, err) - assert.True(t, delRet) // HAS AFTER DELETE - ret = make(map[string]bool) - err = client.Call("kv.Has", keysDel, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) } func TestInMemory(t *testing.T) { @@ -649,7 +634,7 @@ func TestInMemory(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("testInMemoryRPCMethods", testRPCMethodsInMemory) + t.Run("INMEMORY", testRPCMethodsInMemory) stopCh <- struct{}{} wg.Wait() } @@ -660,8 +645,9 @@ func testRPCMethodsInMemory(t *testing.T) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // add 5 second ttl + tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Payload{ + keys := &payload.Request{ Storage: "memory-rr", Items: []*payload.Item{ { @@ -676,63 +662,59 @@ func testRPCMethodsInMemory(t *testing.T) { }, } - data := &payload.Payload{ + data := &payload.Request{ Storage: "memory-rr", Items: []*payload.Item{ { Key: "a", - Value: "aa", + Value: []byte("aa"), }, { Key: "b", - Value: "bb", + Value: []byte("bb"), }, { Key: "c", - Value: "cc", + Value: []byte("cc"), Timeout: tt, }, { Key: "d", - Value: "dd", + Value: []byte("dd"), }, { Key: "e", - Value: "ee", + Value: []byte("ee"), }, }, } - var setRes bool - + ret := &payload.Response{} // Register 3 keys with values - err = client.Call("kv.Set", data, &setRes) + err = client.Call("kv.Set", data, ret) assert.NoError(t, err) - assert.True(t, setRes) - ret := make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 + assert.Len(t, ret.GetItems(), 3) // should be 3 // key "c" should be deleted time.Sleep(time.Second * 7) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 + assert.Len(t, ret.GetItems(), 2) // should be 2 - mGet := make(map[string]interface{}) - err = client.Call("kv.MGet", keys, &mGet) + ret = &payload.Response{} + err = client.Call("kv.MGet", keys, ret) assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, "aa", mGet["a"].(string)) - assert.Equal(t, "bb", mGet["b"].(string)) + assert.Len(t, ret.GetItems(), 2) // c is expired tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := &payload.Payload{ + data2 := &payload.Request{ Storage: "memory-rr", Items: []*payload.Item{ { @@ -751,13 +733,12 @@ func testRPCMethodsInMemory(t *testing.T) { } // MEXPIRE - var mExpRes bool - err = client.Call("kv.MExpire", data2, &mExpRes) + ret = &payload.Response{} + err = client.Call("kv.MExpire", data2, ret) assert.NoError(t, err) - assert.True(t, mExpRes) // TTL - keys2 := &payload.Payload{ + keys2 := &payload.Request{ Storage: "memory-rr", Items: []*payload.Item{ { @@ -772,20 +753,20 @@ func testRPCMethodsInMemory(t *testing.T) { }, } - ttlRes := make(map[string]interface{}) - err = client.Call("kv.TTL", keys2, &ttlRes) + ret = &payload.Response{} + err = client.Call("kv.TTL", keys2, ret) assert.NoError(t, err) - assert.Len(t, ttlRes, 3) + assert.Len(t, ret.GetItems(), 3) // HAS AFTER TTL time.Sleep(time.Second * 15) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys2, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys2, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) // DELETE - keysDel := &payload.Payload{ + keysDel := &payload.Request{ Storage: "memory-rr", Items: []*payload.Item{ { @@ -794,16 +775,15 @@ func testRPCMethodsInMemory(t *testing.T) { }, } - var delRet bool - err = client.Call("kv.Delete", keysDel, &delRet) + ret = &payload.Response{} + err = client.Call("kv.Delete", keysDel, ret) assert.NoError(t, err) - assert.True(t, delRet) // HAS AFTER DELETE - ret = make(map[string]bool) - err = client.Call("kv.Has", keysDel, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) } func TestRedis(t *testing.T) { @@ -868,7 +848,7 @@ func TestRedis(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("testRedisRPCMethods", testRPCMethodsRedis) + t.Run("REDIS", testRPCMethodsRedis) stopCh <- struct{}{} wg.Wait() } @@ -880,7 +860,7 @@ func testRPCMethodsRedis(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := &payload.Payload{ + keys := &payload.Request{ Storage: "redis-rr", Items: []*payload.Item{ { @@ -895,62 +875,59 @@ func testRPCMethodsRedis(t *testing.T) { }, } - data := &payload.Payload{ + data := &payload.Request{ Storage: "redis-rr", Items: []*payload.Item{ { Key: "a", - Value: "aa", + Value: []byte("aa"), }, { Key: "b", - Value: "bb", + Value: []byte("bb"), }, { Key: "c", - Value: "cc", + Value: []byte("cc"), Timeout: tt, }, { Key: "d", - Value: "dd", + Value: []byte("dd"), }, { Key: "e", - Value: "ee", + Value: []byte("ee"), }, }, } - var setRes bool - + ret := &payload.Response{} // Register 3 keys with values - err = client.Call("kv.Set", data, &setRes) + err = client.Call("kv.Set", data, ret) assert.NoError(t, err) - assert.True(t, setRes) - ret := make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 3) // should be 3 + assert.Len(t, ret.GetItems(), 3) // should be 3 // key "c" should be deleted time.Sleep(time.Second * 7) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys, ret) assert.NoError(t, err) - assert.Len(t, ret, 2) // should be 2 + assert.Len(t, ret.GetItems(), 2) // should be 2 - mGet := make(map[string]interface{}) - err = client.Call("kv.MGet", keys, &mGet) + ret = &payload.Response{} + err = client.Call("kv.MGet", keys, ret) assert.NoError(t, err) - assert.Len(t, mGet, 2) // c is expired - assert.Equal(t, "aa", mGet["a"].(string)) - assert.Equal(t, "bb", mGet["b"].(string)) + assert.Len(t, ret.GetItems(), 2) // c is expired tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := &payload.Payload{ + + data2 := &payload.Request{ Storage: "redis-rr", Items: []*payload.Item{ { @@ -969,13 +946,12 @@ func testRPCMethodsRedis(t *testing.T) { } // MEXPIRE - var mExpRes bool - err = client.Call("kv.MExpire", data2, &mExpRes) + ret = &payload.Response{} + err = client.Call("kv.MExpire", data2, ret) assert.NoError(t, err) - assert.True(t, mExpRes) // TTL - keys2 := &payload.Payload{ + keys2 := &payload.Request{ Storage: "redis-rr", Items: []*payload.Item{ { @@ -990,20 +966,20 @@ func testRPCMethodsRedis(t *testing.T) { }, } - ttlRes := make(map[string]interface{}) - err = client.Call("kv.TTL", keys2, &ttlRes) + ret = &payload.Response{} + err = client.Call("kv.TTL", keys2, ret) assert.NoError(t, err) - assert.Len(t, ttlRes, 3) + assert.Len(t, ret.GetItems(), 3) // HAS AFTER TTL time.Sleep(time.Second * 15) - ret = make(map[string]bool) - err = client.Call("kv.Has", keys2, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keys2, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) // DELETE - keysDel := &payload.Payload{ + keysDel := &payload.Request{ Storage: "redis-rr", Items: []*payload.Item{ { @@ -1012,14 +988,13 @@ func testRPCMethodsRedis(t *testing.T) { }, } - var delRet bool - err = client.Call("kv.Delete", keysDel, &delRet) + ret = &payload.Response{} + err = client.Call("kv.Delete", keysDel, ret) assert.NoError(t, err) - assert.True(t, delRet) // HAS AFTER DELETE - ret = make(map[string]bool) - err = client.Call("kv.Has", keysDel, &ret) + ret = &payload.Response{} + err = client.Call("kv.Has", keysDel, ret) assert.NoError(t, err) - assert.Len(t, ret, 0) + assert.Len(t, ret.GetItems(), 0) } diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 4e4c09f1..c1fd5f3e 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -28,20 +28,6 @@ import ( "github.com/stretchr/testify/assert" ) -type Msg struct { - // Topic makeMessage been pushed into. - Topics []string `json:"topic"` - - // Command (join, leave, headers) - Command string `json:"command"` - - // Broker (redis, memory) - Broker string `json:"broker"` - - // Payload to be broadcasted - Payload []byte `json:"payload"` -} - func TestBroadcastInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -819,8 +805,8 @@ func publish(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) + ret := &websocketsv1.Response{} + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) if err != nil { panic(err) } @@ -834,8 +820,8 @@ func publishAsync(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), &ret) + ret := &websocketsv1.Response{} + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) if err != nil { panic(err) } @@ -849,8 +835,8 @@ func publishAsync2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) + ret := &websocketsv1.Response{} + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) if err != nil { panic(err) } @@ -864,8 +850,8 @@ func publish2(command string, broker string, topics ...string) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), &ret) + ret := &websocketsv1.Response{} + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) if err != nil { panic(err) } @@ -879,8 +865,8 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages { - m := &websocketsv1.Messages{ +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request { + m := &websocketsv1.Request{ Messages: []*websocketsv1.Message{ { Topics: topics, |