summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 22:36:40 +0300
committerGitHub <[email protected]>2021-06-08 22:36:40 +0300
commit4f3e16892479db4bd8280a46987f3105e46e5c96 (patch)
tree13c4c3f380d8309b95c9600cc2000d1d5ab87cda
parent49ce25e80ba99ac91bce7ea2b9b632de53e07c0d (diff)
parentcc271dceb13d3929f0382311dfce3dfed2ce04ce (diff)
#712 feat(ws, kv): switch from the `flatbuffers` to the `protobuf`v2.3.0-beta.2
#712 feat(ws, kv): switch from the `flatbuffers` to the `protobuf`
-rw-r--r--CHANGELOG.md2
-rw-r--r--go.mod3
-rw-r--r--go.sum2
-rw-r--r--pkg/proto/kv/v1beta/kv.pb.go236
-rw-r--r--pkg/proto/kv/v1beta/kv.proto17
-rw-r--r--pkg/proto/websockets/v1beta/websockets.pb.go237
-rw-r--r--pkg/proto/websockets/v1beta/websockets.proto15
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--pkg/pubsub/message.fbs14
-rw-r--r--pkg/pubsub/message.go15
-rw-r--r--pkg/pubsub/message/Message.go118
-rw-r--r--pkg/pubsub/message/Messages.go67
-rw-r--r--plugins/kv/drivers/boltdb/driver.go17
-rw-r--r--plugins/kv/drivers/memcached/driver.go18
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/driver.go43
-rw-r--r--plugins/kv/drivers/redis/driver.go20
-rw-r--r--plugins/kv/interface.go14
-rw-r--r--plugins/kv/payload/generated/Item.go67
-rw-r--r--plugins/kv/payload/generated/Payload.go71
-rw-r--r--plugins/kv/payload/payload.fbs14
-rw-r--r--plugins/kv/rpc.go153
-rw-r--r--plugins/redis/fanin.go19
-rw-r--r--plugins/redis/plugin.go31
-rw-r--r--plugins/websockets/executor/executor.go3
-rw-r--r--plugins/websockets/memory/inMemory.go (renamed from plugins/memory/plugin.go)42
-rw-r--r--plugins/websockets/plugin.go40
-rw-r--r--plugins/websockets/pool/workers_pool.go54
-rw-r--r--plugins/websockets/rpc.go93
-rw-r--r--tests/plugins/kv/storage_plugin_test.go634
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go88
31 files changed, 1083 insertions, 1070 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80ee8681..6f4eb607 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,7 +9,7 @@ v2.3.0 (08.06.2021)
- ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of
thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus
on 2CPU cores and 1GB of RAM)
-- ✏️ Flatbuffers binary messages for the `websockets` RPC calls under the hood.
+- ✏️ Protobuf binary messages for the `websockets` RPC calls under the hood.
- ✏️ Json-schemas for the config file v1.0 (it also registered
in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614))
- ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead)
diff --git a/go.mod b/go.mod
index 797881ca..0b8661c0 100644
--- a/go.mod
+++ b/go.mod
@@ -12,7 +12,7 @@ require (
github.com/go-redis/redis/v8 v8.9.0
github.com/gofiber/fiber/v2 v2.10.0
github.com/golang/mock v1.4.4
- github.com/google/flatbuffers v2.0.0+incompatible
+ github.com/golang/protobuf v1.4.3
github.com/google/uuid v1.2.0
github.com/json-iterator/go v1.1.11
github.com/klauspost/compress v1.13.0
@@ -36,4 +36,5 @@ require (
golang.org/x/net v0.0.0-20210226101413-39120d07d75e
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015
+ google.golang.org/protobuf v1.23.0
)
diff --git a/go.sum b/go.sum
index a37991a8..f0b99003 100644
--- a/go.sum
+++ b/go.sum
@@ -148,8 +148,6 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
-github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI=
-github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
diff --git a/pkg/proto/kv/v1beta/kv.pb.go b/pkg/proto/kv/v1beta/kv.pb.go
new file mode 100644
index 00000000..76450869
--- /dev/null
+++ b/pkg/proto/kv/v1beta/kv.pb.go
@@ -0,0 +1,236 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v3.16.0
+// source: kv.proto
+
+package kvv1beta
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type Payload struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // could be an enum in the future
+ Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"`
+ Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
+}
+
+func (x *Payload) Reset() {
+ *x = Payload{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_kv_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Payload) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Payload) ProtoMessage() {}
+
+func (x *Payload) ProtoReflect() protoreflect.Message {
+ mi := &file_kv_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
+func (*Payload) Descriptor() ([]byte, []int) {
+ return file_kv_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Payload) GetStorage() string {
+ if x != nil {
+ return x.Storage
+ }
+ return ""
+}
+
+func (x *Payload) GetItems() []*Item {
+ if x != nil {
+ return x.Items
+ }
+ return nil
+}
+
+type Item struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ // RFC 3339
+ Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
+}
+
+func (x *Item) Reset() {
+ *x = Item{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_kv_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Item) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Item) ProtoMessage() {}
+
+func (x *Item) ProtoReflect() protoreflect.Message {
+ mi := &file_kv_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Item.ProtoReflect.Descriptor instead.
+func (*Item) Descriptor() ([]byte, []int) {
+ return file_kv_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Item) GetKey() string {
+ if x != nil {
+ return x.Key
+ }
+ return ""
+}
+
+func (x *Item) GetValue() string {
+ if x != nil {
+ return x.Value
+ }
+ return ""
+}
+
+func (x *Item) GetTimeout() string {
+ if x != nil {
+ return x.Timeout
+ }
+ return ""
+}
+
+var File_kv_proto protoreflect.FileDescriptor
+
+var file_kv_proto_rawDesc = []byte{
+ 0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74,
+ 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d,
+ 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76,
+ 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e,
+ 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x33,
+}
+
+var (
+ file_kv_proto_rawDescOnce sync.Once
+ file_kv_proto_rawDescData = file_kv_proto_rawDesc
+)
+
+func file_kv_proto_rawDescGZIP() []byte {
+ file_kv_proto_rawDescOnce.Do(func() {
+ file_kv_proto_rawDescData = protoimpl.X.CompressGZIP(file_kv_proto_rawDescData)
+ })
+ return file_kv_proto_rawDescData
+}
+
+var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_kv_proto_goTypes = []interface{}{
+ (*Payload)(nil), // 0: kv.v1beta.Payload
+ (*Item)(nil), // 1: kv.v1beta.Item
+}
+var file_kv_proto_depIdxs = []int32{
+ 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_kv_proto_init() }
+func file_kv_proto_init() {
+ if File_kv_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Payload); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_kv_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Item); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_kv_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_kv_proto_goTypes,
+ DependencyIndexes: file_kv_proto_depIdxs,
+ MessageInfos: file_kv_proto_msgTypes,
+ }.Build()
+ File_kv_proto = out.File
+ file_kv_proto_rawDesc = nil
+ file_kv_proto_goTypes = nil
+ file_kv_proto_depIdxs = nil
+}
diff --git a/pkg/proto/kv/v1beta/kv.proto b/pkg/proto/kv/v1beta/kv.proto
new file mode 100644
index 00000000..1ec0e6b7
--- /dev/null
+++ b/pkg/proto/kv/v1beta/kv.proto
@@ -0,0 +1,17 @@
+syntax = "proto3";
+
+package kv.v1beta;
+option go_package = "./;kvv1beta";
+
+message Payload {
+ // could be an enum in the future
+ string storage = 1;
+ repeated Item items = 2;
+}
+
+message Item {
+ string key = 1;
+ string value = 2;
+ // RFC 3339
+ string timeout = 3;
+}
diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go
new file mode 100644
index 00000000..f04c3cb2
--- /dev/null
+++ b/pkg/proto/websockets/v1beta/websockets.pb.go
@@ -0,0 +1,237 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v3.16.0
+// source: websockets.proto
+
+package websocketsv1beta
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type Message struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"`
+ Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"`
+ Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"`
+ Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"`
+}
+
+func (x *Message) Reset() {
+ *x = Message{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_websockets_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Message) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Message) ProtoMessage() {}
+
+func (x *Message) ProtoReflect() protoreflect.Message {
+ mi := &file_websockets_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Message.ProtoReflect.Descriptor instead.
+func (*Message) Descriptor() ([]byte, []int) {
+ return file_websockets_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Message) GetCommand() string {
+ if x != nil {
+ return x.Command
+ }
+ return ""
+}
+
+func (x *Message) GetBroker() string {
+ if x != nil {
+ return x.Broker
+ }
+ return ""
+}
+
+func (x *Message) GetTopics() []string {
+ if x != nil {
+ return x.Topics
+ }
+ return nil
+}
+
+func (x *Message) GetPayload() []byte {
+ if x != nil {
+ return x.Payload
+ }
+ return nil
+}
+
+type Messages struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
+}
+
+func (x *Messages) Reset() {
+ *x = Messages{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_websockets_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *Messages) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Messages) ProtoMessage() {}
+
+func (x *Messages) ProtoReflect() protoreflect.Message {
+ mi := &file_websockets_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use Messages.ProtoReflect.Descriptor instead.
+func (*Messages) Descriptor() ([]byte, []int) {
+ return file_websockets_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Messages) GetMessages() []*Message {
+ if x != nil {
+ return x.Messages
+ }
+ return nil
+}
+
+var File_websockets_proto protoreflect.FileDescriptor
+
+var file_websockets_proto_rawDesc = []byte{
+ 0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
+ 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72,
+ 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b,
+ 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03,
+ 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
+ 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79,
+ 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
+ 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e,
+ 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77,
+ 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_websockets_proto_rawDescOnce sync.Once
+ file_websockets_proto_rawDescData = file_websockets_proto_rawDesc
+)
+
+func file_websockets_proto_rawDescGZIP() []byte {
+ file_websockets_proto_rawDescOnce.Do(func() {
+ file_websockets_proto_rawDescData = protoimpl.X.CompressGZIP(file_websockets_proto_rawDescData)
+ })
+ return file_websockets_proto_rawDescData
+}
+
+var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_websockets_proto_goTypes = []interface{}{
+ (*Message)(nil), // 0: websockets.v1beta.Message
+ (*Messages)(nil), // 1: websockets.v1beta.Messages
+}
+var file_websockets_proto_depIdxs = []int32{
+ 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_websockets_proto_init() }
+func file_websockets_proto_init() {
+ if File_websockets_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_websockets_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Message); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*Messages); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_websockets_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 2,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_websockets_proto_goTypes,
+ DependencyIndexes: file_websockets_proto_depIdxs,
+ MessageInfos: file_websockets_proto_msgTypes,
+ }.Build()
+ File_websockets_proto = out.File
+ file_websockets_proto_rawDesc = nil
+ file_websockets_proto_goTypes = nil
+ file_websockets_proto_depIdxs = nil
+}
diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto
new file mode 100644
index 00000000..a61da93d
--- /dev/null
+++ b/pkg/proto/websockets/v1beta/websockets.proto
@@ -0,0 +1,15 @@
+syntax = "proto3";
+
+package websockets.v1beta;
+option go_package = "./;websocketsv1beta";
+
+message Message {
+ string command = 1;
+ string broker = 2;
+ repeated string topics = 3;
+ bytes payload = 4;
+}
+
+message Messages {
+ repeated Message messages = 1;
+}
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index eb65b4b7..4926cad6 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,6 +1,6 @@
package pubsub
-import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+import websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
/*
This interface is in BETA. It might be changed.
@@ -42,5 +42,5 @@ type Publisher interface {
// Reader interface should return next message
type Reader interface {
- Next() (*message.Message, error)
+ Next() (*websocketsv1.Message, error)
}
diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs
deleted file mode 100644
index 7e975894..00000000
--- a/pkg/pubsub/message.fbs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace message;
-
-table Message {
- command:string;
- broker:string;
- topics:[string];
- payload:[byte];
-}
-
-table Messages {
- messages:[Message];
-}
-
-root_type Messages;
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
deleted file mode 100644
index 74348722..00000000
--- a/pkg/pubsub/message.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package pubsub
-
-type Message struct {
- // Command (join, leave, headers)
- Command string `json:"command"`
-
- // Broker (redis, memory)
- Broker string `json:"broker"`
-
- // Topic message been pushed into.
- Topics []string `json:"topic"`
-
- // Payload to be broadcasted
- Payload []byte `json:"payload"`
-}
diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go
deleted file mode 100644
index 26bbd12c..00000000
--- a/pkg/pubsub/message/Message.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package message
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Message struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Message{}
- x.Init(buf, n+offset)
- return x
-}
-
-func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
- n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
- x := &Message{}
- x.Init(buf, n+offset+flatbuffers.SizeUint32)
- return x
-}
-
-func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Message) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Message) Command() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Message) Broker() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Message) Topics(j int) []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
- }
- return nil
-}
-
-func (rcv *Message) TopicsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func (rcv *Message) Payload(j int) int8 {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1))
- }
- return 0
-}
-
-func (rcv *Message) PayloadLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func (rcv *Message) MutatePayload(j int, n int8) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
- if o != 0 {
- a := rcv._tab.Vector(o)
- return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n)
- }
- return false
-}
-
-func MessageStart(builder *flatbuffers.Builder) {
- builder.StartObject(4)
-}
-func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0)
-}
-func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0)
-}
-func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0)
-}
-func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
-}
-func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(1, numElems, 1)
-}
-func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go
deleted file mode 100644
index 633b367d..00000000
--- a/pkg/pubsub/message/Messages.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package message
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Messages struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Messages{}
- x.Init(buf, n+offset)
- return x
-}
-
-func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
- n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
- x := &Messages{}
- x.Init(buf, n+offset+flatbuffers.SizeUint32)
- return x
-}
-
-func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Messages) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Messages) Messages(obj *Message, j int) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- x := rcv._tab.Vector(o)
- x += flatbuffers.UOffsetT(j) * 4
- x = rcv._tab.Indirect(x)
- obj.Init(rcv._tab.Bytes, x)
- return true
- }
- return false
-}
-
-func (rcv *Messages) MessagesLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func MessagesStart(builder *flatbuffers.Builder) {
- builder.StartObject(1)
-}
-func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0)
-}
-func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 0f647cb1..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error {
// if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check
// we do not need mutex here, since we use sync.Map
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check correctness of provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
// Store key TTL in the separate map
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
buf.Reset()
@@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- d.gc.Store(items[i].Key, items[i].TTL)
+ d.gc.Store(items[i].Key, items[i].Timeout)
}
return nil
}
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 02281ed5..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,6 +6,7 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
- if items[i] == EmptyItem {
+ if items[i] == nil {
return errors.E(op, errors.EmptyItem)
}
@@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error {
}
// add additional TTL in case of TTL isn't empty
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// verify the TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// verify provided TTL
- t, err := time.Parse(time.RFC3339, items[i].TTL)
+ t, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index cde84f42..3997e0d4 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -9,8 +9,6 @@ import (
const PluginName = "memcached"
-var EmptyItem = kv.Item{}
-
type Plugin struct {
// config plugin
cfgPlugin config.Configurer
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index c2494ee7..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(kv.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(kv.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...kv.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
for i := range items {
+ if items[i] == nil {
+ continue
+ }
// TTL is set
- if items[i].TTL != "" {
+ if items[i].Timeout != "" {
// check the TTL in the item
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return err
}
@@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...kv.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
- if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" {
+ if items[i] == nil {
+ continue
+ }
+ if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
// if key exist, overwrite it value
- if pItem, ok := s.heap.Load(items[i].Key); ok {
+ if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok {
// check that time is correct
- _, err := time.Parse(time.RFC3339, items[i].TTL)
+ _, err := time.Parse(time.RFC3339, items[i].Timeout)
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(kv.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, kv.Item{
- Key: items[i].Key,
- Value: tmp.Value,
- TTL: items[i].TTL,
+ s.heap.Store(items[i].Key, &kvv1.Item{
+ Key: items[i].Key,
+ Value: tmp.Value,
+ Timeout: items[i].Timeout,
})
}
}
@@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(kv.Item).TTL
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -209,12 +216,12 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(kv.Item)
- if v.TTL == "" {
+ v := value.(*kvv1.Item)
+ if v.Timeout == "" {
return true
}
- t, err := time.Parse(time.RFC3339, v.TTL)
+ t, err := time.Parse(time.RFC3339, v.Timeout)
if err != nil {
return false
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index d0b541b2..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,13 +7,12 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-var EmptyItem = kv.Item{}
-
type Driver struct {
universalClient redis.UniversalClient
log logger.Logger
@@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...kv.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
}
now := time.Now()
for _, item := range items {
- if item == EmptyItem {
+ if item == nil {
return errors.E(op, errors.EmptyKey)
}
- if item.TTL == "" {
+ if item.Timeout == "" {
err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err()
if err != nil {
return err
}
} else {
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
@@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...kv.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
- if item.TTL == "" || strings.TrimSpace(item.Key) == "" {
+ if item == nil {
+ continue
+ }
+ if item.Timeout == "" || strings.TrimSpace(item.Key) == "" {
return errors.E(op, errors.Str("should set timeout and at least one key"))
}
- t, err := time.Parse(time.RFC3339, item.TTL)
+ t, err := time.Parse(time.RFC3339, item.Timeout)
if err != nil {
return err
}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 20dbb8b3..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,14 +1,6 @@
package kv
-// Item represents general storage item
-type Item struct {
- // Key of item
- Key string
- // Value of item
- Value string
- // live until time provided by TTL in RFC 3339 format
- TTL string
-}
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -24,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go
deleted file mode 100644
index 61bd6024..00000000
--- a/plugins/kv/payload/generated/Item.go
+++ /dev/null
@@ -1,67 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Item struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Item{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Item) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Item) Key() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Value() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Item) Timeout() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func ItemStart(builder *flatbuffers.Builder) {
- builder.StartObject(3)
-}
-func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0)
-}
-func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0)
-}
-func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0)
-}
-func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go
deleted file mode 100644
index a2c6cfdb..00000000
--- a/plugins/kv/payload/generated/Payload.go
+++ /dev/null
@@ -1,71 +0,0 @@
-// Code generated by the FlatBuffers compiler. DO NOT EDIT.
-
-package generated
-
-import (
- flatbuffers "github.com/google/flatbuffers/go"
-)
-
-type Payload struct {
- _tab flatbuffers.Table
-}
-
-func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload {
- n := flatbuffers.GetUOffsetT(buf[offset:])
- x := &Payload{}
- x.Init(buf, n+offset)
- return x
-}
-
-func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) {
- rcv._tab.Bytes = buf
- rcv._tab.Pos = i
-}
-
-func (rcv *Payload) Table() flatbuffers.Table {
- return rcv._tab
-}
-
-func (rcv *Payload) Storage() []byte {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
- if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
- }
- return nil
-}
-
-func (rcv *Payload) Items(obj *Item, j int) bool {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- x := rcv._tab.Vector(o)
- x += flatbuffers.UOffsetT(j) * 4
- x = rcv._tab.Indirect(x)
- obj.Init(rcv._tab.Bytes, x)
- return true
- }
- return false
-}
-
-func (rcv *Payload) ItemsLength() int {
- o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
- if o != 0 {
- return rcv._tab.VectorLen(o)
- }
- return 0
-}
-
-func PayloadStart(builder *flatbuffers.Builder) {
- builder.StartObject(2)
-}
-func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0)
-}
-func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0)
-}
-func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
- return builder.StartVector(4, numElems, 4)
-}
-func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
- return builder.EndObject()
-}
diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs
deleted file mode 100644
index 7e02c1a0..00000000
--- a/plugins/kv/payload/payload.fbs
+++ /dev/null
@@ -1,14 +0,0 @@
-namespace generated;
-
-table Payload {
- Storage:string;
- Items:[Item];
-}
-
-table Item {
- Key:string;
- Value:string;
- Timeout:string;
-}
-
-root_type Payload;
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 2d4babbe..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,9 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
)
// Wrapper for the plugin
@@ -17,23 +16,21 @@ type rpc struct {
log logger.Logger
}
-// Has accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Has(in []byte, res *map[string]bool) error {
+// Has accept []*payload.Payload proto payload with Storage and Item
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
+ if in.Storage == "" {
+ return errors.E(op, errors.Str("no storage provided"))
+ }
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, utils.AsString(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok {
+ if st, ok := r.storages[in.Storage]; ok {
ret, err := st.Has(keys...)
if err != nil {
return err
@@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Set accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Set(in []byte, ok *bool) error {
+// Set accept proto payload with Storage and Item
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- items := make([]Item, 0, dataRoot.ItemsLength())
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- Value: string(tmpItem.Value()),
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.Set(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.Set(in.GetItems()...)
if err != nil {
return err
}
@@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MGet accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
+// MGet accept proto payload with Storage and Item
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
+ keys := make([]string, 0, len(in.GetItems()))
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.MGet(keys...)
if err != nil {
return err
@@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// MExpire accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) MExpire(in []byte, ok *bool) error {
+// MExpire accept proto payload with Storage and Item
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
-
- // when unmarshalling the keys, simultaneously, fill up the slice with items
- items := make([]Item, 0, l)
- tmpItem := &generated.Item{}
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
-
- itc := Item{
- Key: string(tmpItem.Key()),
- // we set up timeout on the keys, so, value here is redundant
- Value: "",
- TTL: string(tmpItem.Timeout()),
- }
-
- items = append(items, itc)
- }
-
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
- err := st.MExpire(items...)
+ if st, exists := r.storages[in.GetStorage()]; exists {
+ err := st.MExpire(in.GetItems()...)
if err != nil {
return errors.E(op, err)
}
@@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// TTL accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
+// TTL accept proto payload with Storage and Item
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
ret, err := st.TTL(keys...)
if err != nil {
return err
@@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error {
return nil
}
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
-// Delete accept []byte flatbuffers payload with Storage and Item
-func (r *rpc) Delete(in []byte, ok *bool) error {
+// Delete accept proto payload with Storage and Item
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
- dataRoot := generated.GetRootAsPayload(in, 0)
- l := dataRoot.ItemsLength()
- keys := make([]string, 0, l)
- tmpItem := &generated.Item{}
-
- for i := 0; i < l; i++ {
- if !dataRoot.Items(tmpItem, i) {
- continue
- }
- keys = append(keys, string(tmpItem.Key()))
+
+ keys := make([]string, 0, len(in.GetItems()))
+
+ for i := 0; i < len(in.GetItems()); i++ {
+ keys = append(keys, in.Items[i].Key)
}
- if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists {
+ if st, exists := r.storages[in.GetStorage()]; exists {
err := st.Delete(keys...)
if err != nil {
return errors.E(op, err)
@@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error {
}
*ok = false
- return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage()))
+ return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage()))
}
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 321bfaaa..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,9 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
@@ -22,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -65,7 +66,15 @@ func (fi *FanIn) read() {
if !ok {
return
}
- fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0)
+
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
+ if err != nil {
+ fi.log.Error("message unmarshal")
+ continue
+ }
+
+ fi.out <- m
case <-fi.exit:
return
}
@@ -88,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index b2603a40..47ffeb39 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,10 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const PluginName = "redis"
@@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return errors.E(err)
+ }
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
return f.Err()
}
@@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
- for j := 0; j < fbsMsg.TopicsLength(); j++ {
- f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ p.log.Error("message unmarshal error")
+ return
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
if f.Err() != nil {
- p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error())
- return
+ p.log.Error("redis publish", "error", f.Err())
}
}
}()
@@ -200,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
return <-p.fanin.consume(), nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 69aad7d4..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,6 +8,7 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
@@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &pubsub.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/memory/plugin.go b/plugins/websockets/memory/inMemory.go
index 6732ff5d..cef28182 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/websockets/memory/inMemory.go
@@ -4,13 +4,10 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- PluginName string = "memory"
+ "google.golang.org/protobuf/proto"
)
type Plugin struct {
@@ -23,19 +20,12 @@ type Plugin struct {
storage bst.Storage
}
-func (p *Plugin) Init(log logger.Logger) error {
- p.log = log
- p.pushCh = make(chan []byte, 10)
- p.storage = bst.NewBST()
- return nil
-}
-
-// Available interface implementation for the plugin
-func (p *Plugin) Available() {}
-
-// Name is endure.Named interface implementation
-func (p *Plugin) Name() string {
- return PluginName
+func NewInMemory(log logger.Logger) pubsub.PubSub {
+ return &Plugin{
+ log: log,
+ pushCh: make(chan []byte, 10),
+ storage: bst.NewBST(),
+ }
}
func (p *Plugin) Publish(message []byte) error {
@@ -77,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -86,15 +76,19 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- fbsMsg := message.GetRootAsMessage(msg, 0)
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return nil, err
+ }
// push only messages, which are subscribed
// TODO better???
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ for i := 0; i < len(m.GetTopics()); i++ {
// if we have active subscribers - send a message to a topic
// or send nil instead
- if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok {
- return fbsMsg, nil
+ if ok := p.storage.Contains(m.GetTopics()[i]); ok {
+ return m, nil
}
}
return nil, nil
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -23,9 +23,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 544f3ede..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,25 +1,22 @@
package pool
import (
- "bytes"
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
- bPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -27,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
- wp.bPool.New = func() interface{} {
- return new(bytes.Buffer)
- }
// start 10 workers
for i := 0; i < 50; i++ {
@@ -48,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
@@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
- b.Reset()
- wp.bPool.Put(b)
-}
-
-func (wp *WorkersPool) getBytes() *bytes.Buffer {
- return wp.bPool.Get().(*bytes.Buffer)
-}
-
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg == nil {
continue
}
- if msg.TopicsLength() == 0 {
+ if len(msg.GetTopics()) == 0 {
continue
}
- br, ok := wp.storage[utils.AsString(msg.Broker())]
+ br, ok := wp.storage[msg.Broker]
if !ok {
- wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
continue
}
res := wp.get()
- bb := wp.getBytes()
- for i := 0; i < msg.TopicsLength(); i++ {
+ for i := 0; i < len(msg.GetTopics()); i++ {
// get connections for the particular topic
- br.Connections(utils.AsString(msg.Topics(i)), res)
+ br.Connections(msg.GetTopics()[i], res)
}
if len(res) == 0 {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
}
- wp.putBytes(bb)
wp.put(res)
continue
}
@@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
@@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit
conn := c.(*connection.Connection)
// put data into the bytes buffer
- for i := 0; i < msg.PayloadLength(); i++ {
- bb.WriteByte(byte(msg.Payload(i)))
- }
- err := conn.Write(websocket.BinaryMessage, bb.Bytes())
+ err := conn.Write(websocket.BinaryMessage, msg.GetPayload())
if err != nil {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
}
- // put bytes buffer back
- wp.putBytes(bb)
// put map with results back
wp.put(res)
case <-wp.exit:
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 6c2cacb4..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,10 +1,10 @@
package websockets
import (
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -13,39 +13,32 @@ type rpc struct {
log logger.Logger
}
-// Publish ... msg is a flatbuffers decoded payload
+// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(msg []byte, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
+ r.log.Debug("message published", "msg", in.Messages)
- b := flatbuffers.NewBuilder(100)
+ msgLen := len(in.GetMessages())
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
-
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
- err := r.plugin.Publish(bb)
+ err = r.plugin.Publish(bb)
if err != nil {
*ok = false
- b.Reset()
return errors.E(op, err)
}
- b.Reset()
}
*ok = true
@@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
- r.log.Debug("message published", "msg", msg)
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+ const op = errors.Op("publish_async")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
-
- b := flatbuffers.NewBuilder(100)
+ r.log.Debug("message published", "msg", in.Messages)
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
+ msgLen := len(in.GetMessages())
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
r.plugin.PublishAsync(bb)
- b.Reset()
}
*ok = true
return nil
}
-
-func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
- cmdOff := b.CreateByteString(msg.Command())
- brokerOff := b.CreateByteString(msg.Broker())
-
- offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
- for j := msg.TopicsLength() - 1; j >= 0; j-- {
- offsets[j] = b.CreateByteString(msg.Topics(j))
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- bb := make([]byte, msg.PayloadLength())
- for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
- }
- pOff := b.CreateByteVector(bb)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- fOff := message.MessageEnd(b)
- b.Finish(fOff)
-
- return b.FinishedBytes()
-}
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index b4122e8a..760b6951 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -10,66 +10,20 @@ import (
"testing"
"time"
- flatbuffers "github.com/google/flatbuffers/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/stretchr/testify/assert"
)
-func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte {
- b.Reset()
-
- storageOffset := b.CreateString(storage)
-
- // //////////////////// ITEMS VECTOR ////////////////////////////
- offset := make([]flatbuffers.UOffsetT, len(items))
- for i := len(items) - 1; i >= 0; i-- {
- offset[i] = serializeItems(b, items[i])
- }
-
- generated.PayloadStartItemsVector(b, len(offset))
-
- for i := len(offset) - 1; i >= 0; i-- {
- b.PrependUOffsetT(offset[i])
- }
-
- itemsOffset := b.EndVector(len(offset))
- // /////////////////////////////////////////////////////////////////
-
- generated.PayloadStart(b)
- generated.PayloadAddItems(b, itemsOffset)
- generated.PayloadAddStorage(b, storageOffset)
-
- finalOffset := generated.PayloadEnd(b)
-
- b.Finish(finalOffset)
-
- return b.Bytes[b.Head():]
-}
-
-func serializeItems(b *flatbuffers.Builder, item kv.Item) flatbuffers.UOffsetT {
- key := b.CreateString(item.Key)
- val := b.CreateString(item.Value)
- ttl := b.CreateString(item.TTL)
-
- generated.ItemStart(b)
-
- generated.ItemAddKey(b, key)
- generated.ItemAddValue(b, val)
- generated.ItemAddTimeout(b, ttl)
-
- return generated.ItemEnd(b)
-}
-
func TestKVInit(t *testing.T) {
cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
assert.NoError(t, err)
@@ -153,18 +107,19 @@ func kvSetTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
-
- b := flatbuffers.NewBuilder(100)
- args := makePayload(b, "boltdb-south", []kv.Item{
- {
- Key: "key",
- Value: "val",
+ p := &payload.Payload{
+ Storage: "boltdb-south",
+ Items: []*payload.Item{
+ {
+ Key: "key",
+ Value: "val",
+ },
},
- })
+ }
var ok bool
- err = client.Call("kv.Set", args, &ok)
+ err = client.Call("kv.Set", p, &ok)
assert.NoError(t, err)
assert.True(t, ok, "Set return result")
}
@@ -174,17 +129,19 @@ func kvHasTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
-
- b := flatbuffers.NewBuilder(100)
- args := makePayload(b, "boltdb-south", []kv.Item{
- {
- Key: "key",
- Value: "val",
+ p := &payload.Payload{
+ Storage: "boltdb-south",
+ Items: []*payload.Item{
+ {
+ Key: "key",
+ Value: "val",
+ },
},
- })
+ }
+
var ret map[string]bool
- err = client.Call("kv.Has", args, &ret)
+ err = client.Call("kv.Has", p, &ret)
assert.NoError(t, err)
}
@@ -250,7 +207,7 @@ func TestBoltDb(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("testBoltDbRPCMethods", testRPCMethods)
+ t.Run("BOLTDB", testRPCMethods)
stopCh <- struct{}{}
wg.Wait()
@@ -264,40 +221,48 @@ func testRPCMethods(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+
+ keys := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -327,20 +292,24 @@ func testRPCMethods(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -349,17 +318,21 @@ func testRPCMethods(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
+ keys2 := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -373,11 +346,15 @@ func testRPCMethods(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "boltdb-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -464,40 +441,48 @@ func testRPCMethodsMemcached(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+
+ keys := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -527,20 +512,24 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.Equal(t, string("bb"), string(mGet["b"].([]byte)))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -549,17 +538,21 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.True(t, mExpRes)
// TTL call is not supported for the memcached driver
- keys2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "a",
+ keys2 := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "b",
- },
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.Error(t, err)
@@ -573,11 +566,15 @@ func testRPCMethodsMemcached(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "memcached-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -664,40 +661,47 @@ func testRPCMethodsInMemory(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
+ keys := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
- },
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -727,20 +731,24 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
- },
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
+
+ data2 := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -749,17 +757,21 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
+ keys2 := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -773,11 +785,15 @@ func testRPCMethodsInMemory(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "memory-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
@@ -864,40 +880,47 @@ func testRPCMethodsRedis(t *testing.T) {
// add 5 second ttl
tt := time.Now().Add(time.Second * 5).Format(time.RFC3339)
- keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- })
- data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- Value: "aa",
- },
- {
- Key: "b",
- Value: "bb",
- },
- {
- Key: "c",
- Value: "cc",
- TTL: tt,
- },
- {
- Key: "d",
- Value: "dd",
+ keys := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "c",
+ },
},
- {
- Key: "e",
- Value: "ee",
+ }
+
+ data := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Value: "aa",
+ },
+ {
+ Key: "b",
+ Value: "bb",
+ },
+ {
+ Key: "c",
+ Value: "cc",
+ Timeout: tt,
+ },
+ {
+ Key: "d",
+ Value: "dd",
+ },
+ {
+ Key: "e",
+ Value: "ee",
+ },
},
- })
+ }
var setRes bool
@@ -927,20 +950,23 @@ func testRPCMethodsRedis(t *testing.T) {
assert.Equal(t, "bb", mGet["b"].(string))
tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339)
- data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
- TTL: tt2,
+ data2 := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ Timeout: tt2,
+ },
+ {
+ Key: "b",
+ Timeout: tt2,
+ },
+ {
+ Key: "d",
+ Timeout: tt2,
+ },
},
- {
- Key: "b",
- TTL: tt2,
- },
- {
- Key: "d",
- TTL: tt2,
- },
- })
+ }
// MEXPIRE
var mExpRes bool
@@ -949,17 +975,21 @@ func testRPCMethodsRedis(t *testing.T) {
assert.True(t, mExpRes)
// TTL
- keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "a",
+ keys2 := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "a",
+ },
+ {
+ Key: "b",
+ },
+ {
+ Key: "d",
+ },
},
- {
- Key: "b",
- },
- {
- Key: "d",
- },
- })
+ }
+
ttlRes := make(map[string]interface{})
err = client.Call("kv.TTL", keys2, &ttlRes)
assert.NoError(t, err)
@@ -973,11 +1003,15 @@ func testRPCMethodsRedis(t *testing.T) {
assert.Len(t, ret, 0)
// DELETE
- keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{
- {
- Key: "e",
+ keysDel := &payload.Payload{
+ Storage: "redis-rr",
+ Items: []*payload.Item{
+ {
+ Key: "e",
+ },
},
- })
+ }
+
var delRet bool
err = client.Call("kv.Delete", keysDel, &delRet)
assert.NoError(t, err)
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index b2c756bf..4e4c09f1 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -13,16 +13,13 @@ import (
"time"
"github.com/fasthttp/websocket"
- flatbuffers "github.com/google/flatbuffers/go"
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -170,7 +167,6 @@ func TestWSRedisAndMemory(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -220,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) {
}()
time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
t.Run("RPCWsMemory", RPCWsMemory)
t.Run("RPCWsRedis", RPCWsRedis)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
stopCh <- struct{}{}
@@ -464,7 +460,6 @@ func TestWSMemoryDeny(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -592,7 +587,6 @@ func TestWSMemoryStop(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -685,7 +679,6 @@ func TestWSMemoryOk(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
- &memory.Plugin{},
)
assert.NoError(t, err)
@@ -877,8 +870,8 @@ func publish2(command string, broker string, topics ...string) {
panic(err)
}
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *Msg {
- return &Msg{
+func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+ return &websocketsv1.Message{
Topics: topics,
Command: command,
Broker: broker,
@@ -886,70 +879,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) []byte {
- m := []pubsub.Message{
- {
- Topics: topics,
- Command: command,
- Broker: broker,
- Payload: payload,
+func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages {
+ m := &websocketsv1.Messages{
+ Messages: []*websocketsv1.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
},
}
- b := flatbuffers.NewBuilder(1)
-
- return msgs(b, m)
-}
-
-func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT {
- cmdOff := b.CreateString(msg.Command)
- brokerOff := b.CreateString(msg.Broker)
-
- offsets := make([]flatbuffers.UOffsetT, len(msg.Topics))
- for j := len(msg.Topics) - 1; j >= 0; j-- {
- offsets[j] = b.CreateString(msg.Topics[j])
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- pOff := b.CreateByteVector(msg.Payload)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- return message.MessageEnd(b)
-}
-
-func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte {
- b.Reset()
-
- mOff := make([]flatbuffers.UOffsetT, len(msgs))
-
- for i := len(msgs) - 1; i >= 0; i-- {
- mOff[i] = serializeMsg(b, msgs[i])
- }
-
- message.MessagesStartMessagesVector(b, len(mOff))
-
- for i := len(mOff) - 1; i >= 0; i-- {
- b.PrependUOffsetT(mOff[i])
- }
-
- msgsOff := b.EndVector(len(msgs))
-
- message.MessagesStart(b)
- message.MessagesAddMessages(b, msgsOff)
- fOff := message.MessagesEnd(b)
- b.Finish(fOff)
-
- return b.Bytes[b.Head():]
+ return m
}