summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
committerValery Piashchynski <[email protected]>2021-06-08 22:04:28 +0300
commitcc271dceb13d3929f0382311dfce3dfed2ce04ce (patch)
tree13c4c3f380d8309b95c9600cc2000d1d5ab87cda
parenta8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff)
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/proto/kv/v1beta/kv.pb.go (renamed from plugins/kv/payload/payload.pb.go)100
-rw-r--r--pkg/proto/kv/v1beta/kv.proto (renamed from plugins/kv/payload.proto)4
-rw-r--r--pkg/proto/websockets/v1beta/websockets.pb.go (renamed from pkg/pubsub/message/message.pb.go)100
-rw-r--r--pkg/proto/websockets/v1beta/websockets.proto (renamed from pkg/pubsub/message.proto)4
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--plugins/kv/drivers/boltdb/driver.go6
-rw-r--r--plugins/kv/drivers/memcached/driver.go6
-rw-r--r--plugins/kv/drivers/memory/driver.go18
-rw-r--r--plugins/kv/drivers/redis/driver.go6
-rw-r--r--plugins/kv/interface.go6
-rw-r--r--plugins/kv/rpc.go14
-rw-r--r--plugins/redis/fanin.go10
-rw-r--r--plugins/redis/plugin.go8
-rw-r--r--plugins/websockets/executor/executor.go4
-rw-r--r--plugins/websockets/memory/inMemory.go6
-rw-r--r--plugins/websockets/plugin.go14
-rw-r--r--plugins/websockets/pool/workers_pool.go8
-rw-r--r--plugins/websockets/rpc.go6
-rw-r--r--tests/plugins/kv/storage_plugin_test.go4
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go14
20 files changed, 171 insertions, 171 deletions
diff --git a/plugins/kv/payload/payload.pb.go b/pkg/proto/kv/v1beta/kv.pb.go
index 60ad9f13..76450869 100644
--- a/plugins/kv/payload/payload.pb.go
+++ b/pkg/proto/kv/v1beta/kv.pb.go
@@ -2,9 +2,9 @@
// versions:
// protoc-gen-go v1.26.0
// protoc v3.16.0
-// source: payload.proto
+// source: kv.proto
-package payload
+package kvv1beta
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
@@ -33,7 +33,7 @@ type Payload struct {
func (x *Payload) Reset() {
*x = Payload{}
if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[0]
+ mi := &file_kv_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -46,7 +46,7 @@ func (x *Payload) String() string {
func (*Payload) ProtoMessage() {}
func (x *Payload) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[0]
+ mi := &file_kv_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -59,7 +59,7 @@ func (x *Payload) ProtoReflect() protoreflect.Message {
// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
func (*Payload) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{0}
+ return file_kv_proto_rawDescGZIP(), []int{0}
}
func (x *Payload) GetStorage() string {
@@ -81,15 +81,16 @@ type Item struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
- Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+ // RFC 3339
Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
}
func (x *Item) Reset() {
*x = Item{}
if protoimpl.UnsafeEnabled {
- mi := &file_payload_proto_msgTypes[1]
+ mi := &file_kv_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -102,7 +103,7 @@ func (x *Item) String() string {
func (*Item) ProtoMessage() {}
func (x *Item) ProtoReflect() protoreflect.Message {
- mi := &file_payload_proto_msgTypes[1]
+ mi := &file_kv_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -115,7 +116,7 @@ func (x *Item) ProtoReflect() protoreflect.Message {
// Deprecated: Use Item.ProtoReflect.Descriptor instead.
func (*Item) Descriptor() ([]byte, []int) {
- return file_payload_proto_rawDescGZIP(), []int{1}
+ return file_kv_proto_rawDescGZIP(), []int{1}
}
func (x *Item) GetKey() string {
@@ -139,42 +140,43 @@ func (x *Item) GetTimeout() string {
return ""
}
-var File_payload_proto protoreflect.FileDescriptor
+var File_kv_proto protoreflect.FileDescriptor
-var file_payload_proto_rawDesc = []byte{
- 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
- 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18,
- 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d,
- 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65,
- 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d,
- 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
- 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65,
- 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f,
- 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62,
- 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+var file_kv_proto_rawDesc = []byte{
+ 0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74,
+ 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d,
+ 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
+ 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76,
+ 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75,
+ 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e,
+ 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
+ 0x6f, 0x33,
}
var (
- file_payload_proto_rawDescOnce sync.Once
- file_payload_proto_rawDescData = file_payload_proto_rawDesc
+ file_kv_proto_rawDescOnce sync.Once
+ file_kv_proto_rawDescData = file_kv_proto_rawDesc
)
-func file_payload_proto_rawDescGZIP() []byte {
- file_payload_proto_rawDescOnce.Do(func() {
- file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData)
+func file_kv_proto_rawDescGZIP() []byte {
+ file_kv_proto_rawDescOnce.Do(func() {
+ file_kv_proto_rawDescData = protoimpl.X.CompressGZIP(file_kv_proto_rawDescData)
})
- return file_payload_proto_rawDescData
+ return file_kv_proto_rawDescData
}
-var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
-var file_payload_proto_goTypes = []interface{}{
- (*Payload)(nil), // 0: kv.Payload
- (*Item)(nil), // 1: kv.Item
+var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_kv_proto_goTypes = []interface{}{
+ (*Payload)(nil), // 0: kv.v1beta.Payload
+ (*Item)(nil), // 1: kv.v1beta.Item
}
-var file_payload_proto_depIdxs = []int32{
- 1, // 0: kv.Payload.items:type_name -> kv.Item
+var file_kv_proto_depIdxs = []int32{
+ 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
@@ -182,13 +184,13 @@ var file_payload_proto_depIdxs = []int32{
0, // [0:1] is the sub-list for field type_name
}
-func init() { file_payload_proto_init() }
-func file_payload_proto_init() {
- if File_payload_proto != nil {
+func init() { file_kv_proto_init() }
+func file_kv_proto_init() {
+ if File_kv_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
- file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Payload); i {
case 0:
return &v.state
@@ -200,7 +202,7 @@ func file_payload_proto_init() {
return nil
}
}
- file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ file_kv_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Item); i {
case 0:
return &v.state
@@ -217,18 +219,18 @@ func file_payload_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_payload_proto_rawDesc,
+ RawDescriptor: file_kv_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
- GoTypes: file_payload_proto_goTypes,
- DependencyIndexes: file_payload_proto_depIdxs,
- MessageInfos: file_payload_proto_msgTypes,
+ GoTypes: file_kv_proto_goTypes,
+ DependencyIndexes: file_kv_proto_depIdxs,
+ MessageInfos: file_kv_proto_msgTypes,
}.Build()
- File_payload_proto = out.File
- file_payload_proto_rawDesc = nil
- file_payload_proto_goTypes = nil
- file_payload_proto_depIdxs = nil
+ File_kv_proto = out.File
+ file_kv_proto_rawDesc = nil
+ file_kv_proto_goTypes = nil
+ file_kv_proto_depIdxs = nil
}
diff --git a/plugins/kv/payload.proto b/pkg/proto/kv/v1beta/kv.proto
index 66a22ee4..1ec0e6b7 100644
--- a/plugins/kv/payload.proto
+++ b/pkg/proto/kv/v1beta/kv.proto
@@ -1,7 +1,7 @@
syntax = "proto3";
-package kv.v1;
-option go_package = "./payload";
+package kv.v1beta;
+option go_package = "./;kvv1beta";
message Payload {
// could be an enum in the future
diff --git a/pkg/pubsub/message/message.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go
index 3a73c39c..f04c3cb2 100644
--- a/pkg/pubsub/message/message.pb.go
+++ b/pkg/proto/websockets/v1beta/websockets.pb.go
@@ -2,9 +2,9 @@
// versions:
// protoc-gen-go v1.26.0
// protoc v3.16.0
-// source: message.proto
+// source: websockets.proto
-package message
+package websocketsv1beta
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
@@ -34,7 +34,7 @@ type Message struct {
func (x *Message) Reset() {
*x = Message{}
if protoimpl.UnsafeEnabled {
- mi := &file_message_proto_msgTypes[0]
+ mi := &file_websockets_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -47,7 +47,7 @@ func (x *Message) String() string {
func (*Message) ProtoMessage() {}
func (x *Message) ProtoReflect() protoreflect.Message {
- mi := &file_message_proto_msgTypes[0]
+ mi := &file_websockets_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -60,7 +60,7 @@ func (x *Message) ProtoReflect() protoreflect.Message {
// Deprecated: Use Message.ProtoReflect.Descriptor instead.
func (*Message) Descriptor() ([]byte, []int) {
- return file_message_proto_rawDescGZIP(), []int{0}
+ return file_websockets_proto_rawDescGZIP(), []int{0}
}
func (x *Message) GetCommand() string {
@@ -102,7 +102,7 @@ type Messages struct {
func (x *Messages) Reset() {
*x = Messages{}
if protoimpl.UnsafeEnabled {
- mi := &file_message_proto_msgTypes[1]
+ mi := &file_websockets_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -115,7 +115,7 @@ func (x *Messages) String() string {
func (*Messages) ProtoMessage() {}
func (x *Messages) ProtoReflect() protoreflect.Message {
- mi := &file_message_proto_msgTypes[1]
+ mi := &file_websockets_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -128,7 +128,7 @@ func (x *Messages) ProtoReflect() protoreflect.Message {
// Deprecated: Use Messages.ProtoReflect.Descriptor instead.
func (*Messages) Descriptor() ([]byte, []int) {
- return file_message_proto_rawDescGZIP(), []int{1}
+ return file_websockets_proto_rawDescGZIP(), []int{1}
}
func (x *Messages) GetMessages() []*Message {
@@ -138,44 +138,46 @@ func (x *Messages) GetMessages() []*Message {
return nil
}
-var File_message_proto protoreflect.FileDescriptor
+var File_websockets_proto protoreflect.FileDescriptor
-var file_message_proto_rawDesc = []byte{
- 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
- 0x09, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65,
- 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
- 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
- 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63,
- 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12,
- 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c,
- 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x3a, 0x0a, 0x08, 0x4d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
- 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62,
- 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73,
- 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61,
- 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+var file_websockets_proto_rawDesc = []byte{
+ 0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f,
+ 0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76,
+ 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
+ 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72,
+ 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b,
+ 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03,
+ 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
+ 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79,
+ 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73,
+ 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e,
+ 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08,
+ 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77,
+ 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62,
+ 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
- file_message_proto_rawDescOnce sync.Once
- file_message_proto_rawDescData = file_message_proto_rawDesc
+ file_websockets_proto_rawDescOnce sync.Once
+ file_websockets_proto_rawDescData = file_websockets_proto_rawDesc
)
-func file_message_proto_rawDescGZIP() []byte {
- file_message_proto_rawDescOnce.Do(func() {
- file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData)
+func file_websockets_proto_rawDescGZIP() []byte {
+ file_websockets_proto_rawDescOnce.Do(func() {
+ file_websockets_proto_rawDescData = protoimpl.X.CompressGZIP(file_websockets_proto_rawDescData)
})
- return file_message_proto_rawDescData
+ return file_websockets_proto_rawDescData
}
-var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
-var file_message_proto_goTypes = []interface{}{
- (*Message)(nil), // 0: pubsub.v1.Message
- (*Messages)(nil), // 1: pubsub.v1.Messages
+var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_websockets_proto_goTypes = []interface{}{
+ (*Message)(nil), // 0: websockets.v1beta.Message
+ (*Messages)(nil), // 1: websockets.v1beta.Messages
}
-var file_message_proto_depIdxs = []int32{
- 0, // 0: pubsub.v1.Messages.messages:type_name -> pubsub.v1.Message
+var file_websockets_proto_depIdxs = []int32{
+ 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message
1, // [1:1] is the sub-list for method output_type
1, // [1:1] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
@@ -183,13 +185,13 @@ var file_message_proto_depIdxs = []int32{
0, // [0:1] is the sub-list for field type_name
}
-func init() { file_message_proto_init() }
-func file_message_proto_init() {
- if File_message_proto != nil {
+func init() { file_websockets_proto_init() }
+func file_websockets_proto_init() {
+ if File_websockets_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
- file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ file_websockets_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Message); i {
case 0:
return &v.state
@@ -201,7 +203,7 @@ func file_message_proto_init() {
return nil
}
}
- file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Messages); i {
case 0:
return &v.state
@@ -218,18 +220,18 @@ func file_message_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_message_proto_rawDesc,
+ RawDescriptor: file_websockets_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
- GoTypes: file_message_proto_goTypes,
- DependencyIndexes: file_message_proto_depIdxs,
- MessageInfos: file_message_proto_msgTypes,
+ GoTypes: file_websockets_proto_goTypes,
+ DependencyIndexes: file_websockets_proto_depIdxs,
+ MessageInfos: file_websockets_proto_msgTypes,
}.Build()
- File_message_proto = out.File
- file_message_proto_rawDesc = nil
- file_message_proto_goTypes = nil
- file_message_proto_depIdxs = nil
+ File_websockets_proto = out.File
+ file_websockets_proto_rawDesc = nil
+ file_websockets_proto_goTypes = nil
+ file_websockets_proto_depIdxs = nil
}
diff --git a/pkg/pubsub/message.proto b/pkg/proto/websockets/v1beta/websockets.proto
index 772e7611..a61da93d 100644
--- a/pkg/pubsub/message.proto
+++ b/pkg/proto/websockets/v1beta/websockets.proto
@@ -1,7 +1,7 @@
syntax = "proto3";
-package pubsub.v1;
-option go_package = "./message";
+package websockets.v1beta;
+option go_package = "./;websocketsv1beta";
message Message {
string command = 1;
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index eb65b4b7..4926cad6 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,6 +1,6 @@
package pubsub
-import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+import websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
/*
This interface is in BETA. It might be changed.
@@ -42,5 +42,5 @@ type Publisher interface {
// Reader interface should return next message
type Reader interface {
- Next() (*message.Message, error)
+ Next() (*websocketsv1.Message, error)
}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index af107dff..ba873513 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -10,9 +10,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
@@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
}
// Set puts the K/V to the bolt
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("boltdb_driver_mexpire")
for i := range items {
if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index 9c4689c4..8ea515d0 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,9 +6,9 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error {
// MExpire Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("memcached_plugin_mexpire")
for i := range items {
if items[i] == nil {
diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go
index fae2c831..3158adee 100644
--- a/plugins/kv/drivers/memory/driver.go
+++ b/plugins/kv/drivers/memory/driver.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) {
if data, exist := s.heap.Load(key); exist {
// here might be a panic
// but data only could be a string, see Set function
- return utils.AsBytes(data.(*payload.Item).Value), nil
+ return utils.AsBytes(data.(*kvv1.Item).Value), nil
}
return nil, nil
}
@@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if value, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = value.(*payload.Item).Value
+ m[keys[i]] = value.(*kvv1.Item).Value
}
}
return m, nil
}
-func (s *Driver) Set(items ...*payload.Item) error {
+func (s *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error {
// MExpire sets the expiration time to the key
// If key already has the expiration time, it will be overwritten
-func (s *Driver) MExpire(items ...*payload.Item) error {
+func (s *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("in_memory_plugin_mexpire")
for i := range items {
if items[i] == nil {
@@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error {
if err != nil {
return errors.E(op, err)
}
- tmp := pItem.(*payload.Item)
+ tmp := pItem.(*kvv1.Item)
// guess that t is in the future
// in memory is just FOR TESTING PURPOSES
// LOGIC ISN'T IDEAL
- s.heap.Store(items[i].Key, &payload.Item{
+ s.heap.Store(items[i].Key, &kvv1.Item{
Key: items[i].Key,
Value: tmp.Value,
Timeout: items[i].Timeout,
@@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) {
for i := range keys {
if item, ok := s.heap.Load(keys[i]); ok {
- m[keys[i]] = item.(*payload.Item).Timeout
+ m[keys[i]] = item.(*kvv1.Item).Timeout
}
}
return m, nil
@@ -216,7 +216,7 @@ func (s *Driver) gc() {
case now := <-ticker.C:
// check every second
s.heap.Range(func(key, value interface{}) bool {
- v := value.(*payload.Item)
+ v := value.(*kvv1.Item)
if v.Timeout == "" {
return true
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go
index 5890367b..0aaa6352 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/kv/drivers/redis/driver.go
@@ -7,9 +7,9 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) {
//
// Use expiration for `SETEX`-like behavior.
// Zero expiration means the key has no expiration time.
-func (d *Driver) Set(items ...*payload.Item) error {
+func (d *Driver) Set(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_set")
if items == nil {
return errors.E(op, errors.NoKeys)
@@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error {
// MExpire https://redis.io/commands/expire
// timeout in RFC3339
-func (d *Driver) MExpire(items ...*payload.Item) error {
+func (d *Driver) MExpire(items ...*kvv1.Item) error {
const op = errors.Op("redis_driver_mexpire")
now := time.Now()
for _, item := range items {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index abcd7f47..7841f9a2 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -16,10 +16,10 @@ type Storage interface {
// Set used to upload item to KV with TTL
// 0 value in TTL means no TTL
- Set(items ...*payload.Item) error
+ Set(items ...*kvv1.Item) error
// MExpire sets the TTL for multiply keys
- MExpire(items ...*payload.Item) error
+ MExpire(items ...*kvv1.Item) error
// TTL return the rest time to live for provided keys
// Not supported for the memcached and boltdb
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index 009a5c56..a9efe0c4 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,7 +2,7 @@ package kv
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
+ kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -17,7 +17,7 @@ type rpc struct {
}
// Has accept []*payload.Payload proto payload with Storage and Item
-func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
+func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error {
const op = errors.Op("rpc_has")
if in.Storage == "" {
@@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error {
}
// Set accept proto payload with Storage and Item
-func (r *rpc) Set(in *payload.Payload, ok *bool) error {
+func (r *rpc) Set(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_set")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error {
}
// MGet accept proto payload with Storage and Item
-func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_mget")
keys := make([]string, 0, len(in.GetItems()))
@@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error {
}
// MExpire accept proto payload with Storage and Item
-func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
+func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rpc_mexpire")
if st, exists := r.storages[in.GetStorage()]; exists {
@@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error {
}
// TTL accept proto payload with Storage and Item
-func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
+func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error {
const op = errors.Op("rpc_ttl")
keys := make([]string, 0, len(in.GetItems()))
@@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error {
}
// Delete accept proto payload with Storage and Item
-func (r *rpc) Delete(in *payload.Payload, ok *bool) error {
+func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error {
const op = errors.Op("rcp_delete")
keys := make([]string, 0, len(in.GetItems()))
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index 76bef400..ac9ebcc2 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,7 +4,7 @@ import (
"context"
"sync"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -23,13 +23,13 @@ type FanIn struct {
log logger.Logger
// out channel with all subs
- out chan *message.Message
+ out chan *websocketsv1.Message
exit chan struct{}
}
func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn {
- out := make(chan *message.Message, 100)
+ out := make(chan *websocketsv1.Message, 100)
fi := &FanIn{
out: out,
client: redisClient,
@@ -67,7 +67,7 @@ func (fi *FanIn) read() {
return
}
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(utils.AsBytes(msg.Payload), m)
if err != nil {
fi.log.Error("message unmarshal")
@@ -97,6 +97,6 @@ func (fi *FanIn) stop() error {
return nil
}
-func (fi *FanIn) consume() <-chan *message.Message {
+func (fi *FanIn) consume() <-chan *websocketsv1.Message {
return fi.out
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 695e7b08..47ffeb39 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -6,7 +6,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -107,7 +107,7 @@ func (p *Plugin) Publish(msg []byte) error {
p.Lock()
defer p.Unlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
return errors.E(err)
@@ -126,7 +126,7 @@ func (p *Plugin) PublishAsync(msg []byte) {
go func() {
p.Lock()
defer p.Unlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
p.log.Error("message unmarshal error")
@@ -209,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
// Next return next message
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
return <-p.fanin.consume(), nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 951c9a1a..e3d47166 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -8,8 +8,8 @@ import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err = json.Unmarshal(data, msg)
if err != nil {
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
index deb927ed..cef28182 100644
--- a/plugins/websockets/memory/inMemory.go
+++ b/plugins/websockets/memory/inMemory.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*message.Message, error) {
+func (p *Plugin) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
@@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) {
p.RLock()
defer p.RUnlock()
- m := &message.Message{}
+ m := &websocketsv1.Message{}
err := proto.Unmarshal(msg, m)
if err != nil {
return nil, err
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf21fffa..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error {
p.Lock()
defer p.Unlock()
- // attach default driver
- if len(p.pubsubs) == 0 {
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
- }
-
p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{
Debug: p.cfg.Pool.Debug,
NumWorkers: p.cfg.Pool.NumWorkers,
@@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return err
@@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &message.Message{}
+ msg := &websocketsv1.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index efafb2d3..1a7c6f8a 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/fasthttp/websocket"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
)
@@ -16,7 +16,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *message.Message
+ queue chan *websocketsv1.Message
exit chan struct{}
}
@@ -24,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *message.Message, 100),
+ queue: make(chan *websocketsv1.Message, 100),
storage: pubsubs,
log: log,
exit: make(chan struct{}),
@@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
return wp
}
-func (wp *WorkersPool) Queue(msg *message.Message) {
+func (wp *WorkersPool) Queue(msg *websocketsv1.Message) {
wp.queue <- msg
}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index ef44884a..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -2,7 +2,7 @@ package websockets
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
@@ -15,7 +15,7 @@ type rpc struct {
// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *message.Messages, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
// just return in case of nil message
@@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error {
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("publish_async")
// just return in case of nil message
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index 1bcb3455..760b6951 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -12,13 +12,13 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis"
- "github.com/spiral/roadrunner/v2/plugins/kv/payload"
"github.com/spiral/roadrunner/v2/plugins/logger"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/stretchr/testify/assert"
@@ -107,7 +107,6 @@ func kvSetTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
-
p := &payload.Payload{
Storage: "boltdb-south",
Items: []*payload.Item{
@@ -130,7 +129,6 @@ func kvHasTest(t *testing.T) {
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
// WorkerList contains list of workers.
-
p := &payload.Payload{
Storage: "boltdb-south",
Items: []*payload.Item{
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 593085b7..4e4c09f1 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -16,7 +16,7 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -216,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) {
}()
time.Sleep(time.Second * 1)
+ t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
t.Run("RPCWsMemory", RPCWsMemory)
t.Run("RPCWsRedis", RPCWsRedis)
- t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync)
stopCh <- struct{}{}
@@ -870,8 +870,8 @@ func publish2(command string, broker string, topics ...string) {
panic(err)
}
}
-func messageWS(command string, broker string, payload []byte, topics ...string) *message.Message {
- return &message.Message{
+func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+ return &websocketsv1.Message{
Topics: topics,
Command: command,
Broker: broker,
@@ -879,9 +879,9 @@ func messageWS(command string, broker string, payload []byte, topics ...string)
}
}
-func makeMessage(command string, broker string, payload []byte, topics ...string) *message.Messages {
- m := &message.Messages{
- Messages: []*message.Message{
+func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages {
+ m := &websocketsv1.Messages{
+ Messages: []*websocketsv1.Message{
{
Topics: topics,
Command: command,