diff options
author | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-08 22:04:28 +0300 |
commit | cc271dceb13d3929f0382311dfce3dfed2ce04ce (patch) | |
tree | 13c4c3f380d8309b95c9600cc2000d1d5ab87cda | |
parent | a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 (diff) |
- Add protobuf versioning
Signed-off-by: Valery Piashchynski <[email protected]>
20 files changed, 171 insertions, 171 deletions
diff --git a/plugins/kv/payload/payload.pb.go b/pkg/proto/kv/v1beta/kv.pb.go index 60ad9f13..76450869 100644 --- a/plugins/kv/payload/payload.pb.go +++ b/pkg/proto/kv/v1beta/kv.pb.go @@ -2,9 +2,9 @@ // versions: // protoc-gen-go v1.26.0 // protoc v3.16.0 -// source: payload.proto +// source: kv.proto -package payload +package kvv1beta import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -33,7 +33,7 @@ type Payload struct { func (x *Payload) Reset() { *x = Payload{} if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[0] + mi := &file_kv_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -46,7 +46,7 @@ func (x *Payload) String() string { func (*Payload) ProtoMessage() {} func (x *Payload) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[0] + mi := &file_kv_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -59,7 +59,7 @@ func (x *Payload) ProtoReflect() protoreflect.Message { // Deprecated: Use Payload.ProtoReflect.Descriptor instead. func (*Payload) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{0} + return file_kv_proto_rawDescGZIP(), []int{0} } func (x *Payload) GetStorage() string { @@ -81,15 +81,16 @@ type Item struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + // RFC 3339 Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` } func (x *Item) Reset() { *x = Item{} if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[1] + mi := &file_kv_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -102,7 +103,7 @@ func (x *Item) String() string { func (*Item) ProtoMessage() {} func (x *Item) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[1] + mi := &file_kv_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -115,7 +116,7 @@ func (x *Item) ProtoReflect() protoreflect.Message { // Deprecated: Use Item.ProtoReflect.Descriptor instead. func (*Item) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{1} + return file_kv_proto_rawDescGZIP(), []int{1} } func (x *Item) GetKey() string { @@ -139,42 +140,43 @@ func (x *Item) GetTimeout() string { return "" } -var File_payload_proto protoreflect.FileDescriptor +var File_kv_proto protoreflect.FileDescriptor -var file_payload_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65, - 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var file_kv_proto_rawDesc = []byte{ + 0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74, + 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, + 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, + 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( - file_payload_proto_rawDescOnce sync.Once - file_payload_proto_rawDescData = file_payload_proto_rawDesc + file_kv_proto_rawDescOnce sync.Once + file_kv_proto_rawDescData = file_kv_proto_rawDesc ) -func file_payload_proto_rawDescGZIP() []byte { - file_payload_proto_rawDescOnce.Do(func() { - file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData) +func file_kv_proto_rawDescGZIP() []byte { + file_kv_proto_rawDescOnce.Do(func() { + file_kv_proto_rawDescData = protoimpl.X.CompressGZIP(file_kv_proto_rawDescData) }) - return file_payload_proto_rawDescData + return file_kv_proto_rawDescData } -var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_payload_proto_goTypes = []interface{}{ - (*Payload)(nil), // 0: kv.Payload - (*Item)(nil), // 1: kv.Item +var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_kv_proto_goTypes = []interface{}{ + (*Payload)(nil), // 0: kv.v1beta.Payload + (*Item)(nil), // 1: kv.v1beta.Item } -var file_payload_proto_depIdxs = []int32{ - 1, // 0: kv.Payload.items:type_name -> kv.Item +var file_kv_proto_depIdxs = []int32{ + 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name @@ -182,13 +184,13 @@ var file_payload_proto_depIdxs = []int32{ 0, // [0:1] is the sub-list for field type_name } -func init() { file_payload_proto_init() } -func file_payload_proto_init() { - if File_payload_proto != nil { +func init() { file_kv_proto_init() } +func file_kv_proto_init() { + if File_kv_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Payload); i { case 0: return &v.state @@ -200,7 +202,7 @@ func file_payload_proto_init() { return nil } } - file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_kv_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Item); i { case 0: return &v.state @@ -217,18 +219,18 @@ func file_payload_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_payload_proto_rawDesc, + RawDescriptor: file_kv_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_payload_proto_goTypes, - DependencyIndexes: file_payload_proto_depIdxs, - MessageInfos: file_payload_proto_msgTypes, + GoTypes: file_kv_proto_goTypes, + DependencyIndexes: file_kv_proto_depIdxs, + MessageInfos: file_kv_proto_msgTypes, }.Build() - File_payload_proto = out.File - file_payload_proto_rawDesc = nil - file_payload_proto_goTypes = nil - file_payload_proto_depIdxs = nil + File_kv_proto = out.File + file_kv_proto_rawDesc = nil + file_kv_proto_goTypes = nil + file_kv_proto_depIdxs = nil } diff --git a/plugins/kv/payload.proto b/pkg/proto/kv/v1beta/kv.proto index 66a22ee4..1ec0e6b7 100644 --- a/plugins/kv/payload.proto +++ b/pkg/proto/kv/v1beta/kv.proto @@ -1,7 +1,7 @@ syntax = "proto3"; -package kv.v1; -option go_package = "./payload"; +package kv.v1beta; +option go_package = "./;kvv1beta"; message Payload { // could be an enum in the future diff --git a/pkg/pubsub/message/message.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go index 3a73c39c..f04c3cb2 100644 --- a/pkg/pubsub/message/message.pb.go +++ b/pkg/proto/websockets/v1beta/websockets.pb.go @@ -2,9 +2,9 @@ // versions: // protoc-gen-go v1.26.0 // protoc v3.16.0 -// source: message.proto +// source: websockets.proto -package message +package websocketsv1beta import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -34,7 +34,7 @@ type Message struct { func (x *Message) Reset() { *x = Message{} if protoimpl.UnsafeEnabled { - mi := &file_message_proto_msgTypes[0] + mi := &file_websockets_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -47,7 +47,7 @@ func (x *Message) String() string { func (*Message) ProtoMessage() {} func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_message_proto_msgTypes[0] + mi := &file_websockets_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -60,7 +60,7 @@ func (x *Message) ProtoReflect() protoreflect.Message { // Deprecated: Use Message.ProtoReflect.Descriptor instead. func (*Message) Descriptor() ([]byte, []int) { - return file_message_proto_rawDescGZIP(), []int{0} + return file_websockets_proto_rawDescGZIP(), []int{0} } func (x *Message) GetCommand() string { @@ -102,7 +102,7 @@ type Messages struct { func (x *Messages) Reset() { *x = Messages{} if protoimpl.UnsafeEnabled { - mi := &file_message_proto_msgTypes[1] + mi := &file_websockets_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -115,7 +115,7 @@ func (x *Messages) String() string { func (*Messages) ProtoMessage() {} func (x *Messages) ProtoReflect() protoreflect.Message { - mi := &file_message_proto_msgTypes[1] + mi := &file_websockets_proto_msgTypes[1] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -128,7 +128,7 @@ func (x *Messages) ProtoReflect() protoreflect.Message { // Deprecated: Use Messages.ProtoReflect.Descriptor instead. func (*Messages) Descriptor() ([]byte, []int) { - return file_message_proto_rawDescGZIP(), []int{1} + return file_websockets_proto_rawDescGZIP(), []int{1} } func (x *Messages) GetMessages() []*Message { @@ -138,44 +138,46 @@ func (x *Messages) GetMessages() []*Message { return nil } -var File_message_proto protoreflect.FileDescriptor +var File_websockets_proto protoreflect.FileDescriptor -var file_message_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x09, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, - 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, - 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x3a, 0x0a, 0x08, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, - 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +var file_websockets_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, + 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, + 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( - file_message_proto_rawDescOnce sync.Once - file_message_proto_rawDescData = file_message_proto_rawDesc + file_websockets_proto_rawDescOnce sync.Once + file_websockets_proto_rawDescData = file_websockets_proto_rawDesc ) -func file_message_proto_rawDescGZIP() []byte { - file_message_proto_rawDescOnce.Do(func() { - file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData) +func file_websockets_proto_rawDescGZIP() []byte { + file_websockets_proto_rawDescOnce.Do(func() { + file_websockets_proto_rawDescData = protoimpl.X.CompressGZIP(file_websockets_proto_rawDescData) }) - return file_message_proto_rawDescData + return file_websockets_proto_rawDescData } -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_message_proto_goTypes = []interface{}{ - (*Message)(nil), // 0: pubsub.v1.Message - (*Messages)(nil), // 1: pubsub.v1.Messages +var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_websockets_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: websockets.v1beta.Message + (*Messages)(nil), // 1: websockets.v1beta.Messages } -var file_message_proto_depIdxs = []int32{ - 0, // 0: pubsub.v1.Messages.messages:type_name -> pubsub.v1.Message +var file_websockets_proto_depIdxs = []int32{ + 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name @@ -183,13 +185,13 @@ var file_message_proto_depIdxs = []int32{ 0, // [0:1] is the sub-list for field type_name } -func init() { file_message_proto_init() } -func file_message_proto_init() { - if File_message_proto != nil { +func init() { file_websockets_proto_init() } +func file_websockets_proto_init() { + if File_websockets_proto != nil { return } if !protoimpl.UnsafeEnabled { - file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_websockets_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Message); i { case 0: return &v.state @@ -201,7 +203,7 @@ func file_message_proto_init() { return nil } } - file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Messages); i { case 0: return &v.state @@ -218,18 +220,18 @@ func file_message_proto_init() { out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_message_proto_rawDesc, + RawDescriptor: file_websockets_proto_rawDesc, NumEnums: 0, NumMessages: 2, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_message_proto_goTypes, - DependencyIndexes: file_message_proto_depIdxs, - MessageInfos: file_message_proto_msgTypes, + GoTypes: file_websockets_proto_goTypes, + DependencyIndexes: file_websockets_proto_depIdxs, + MessageInfos: file_websockets_proto_msgTypes, }.Build() - File_message_proto = out.File - file_message_proto_rawDesc = nil - file_message_proto_goTypes = nil - file_message_proto_depIdxs = nil + File_websockets_proto = out.File + file_websockets_proto_rawDesc = nil + file_websockets_proto_goTypes = nil + file_websockets_proto_depIdxs = nil } diff --git a/pkg/pubsub/message.proto b/pkg/proto/websockets/v1beta/websockets.proto index 772e7611..a61da93d 100644 --- a/pkg/pubsub/message.proto +++ b/pkg/proto/websockets/v1beta/websockets.proto @@ -1,7 +1,7 @@ syntax = "proto3"; -package pubsub.v1; -option go_package = "./message"; +package websockets.v1beta; +option go_package = "./;websocketsv1beta"; message Message { string command = 1; diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index eb65b4b7..4926cad6 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -1,6 +1,6 @@ package pubsub -import "github.com/spiral/roadrunner/v2/pkg/pubsub/message" +import websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" /* This interface is in BETA. It might be changed. @@ -42,5 +42,5 @@ type Publisher interface { // Reader interface should return next message type Reader interface { - Next() (*message.Message, error) + Next() (*websocketsv1.Message, error) } diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index af107dff..ba873513 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -10,9 +10,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" @@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 9c4689c4..8ea515d0 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,9 +6,9 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { if items[i] == nil { diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index fae2c831..3158adee 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(*payload.Item).Value), nil + return utils.AsBytes(data.(*kvv1.Item).Value), nil } return nil, nil } @@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*payload.Item).Value + m[keys[i]] = value.(*kvv1.Item).Value } } return m, nil } -func (s *Driver) Set(items ...*payload.Item) error { +func (s *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*payload.Item) error { +func (s *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error { if err != nil { return errors.E(op, err) } - tmp := pItem.(*payload.Item) + tmp := pItem.(*kvv1.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &payload.Item{ + s.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*payload.Item).Timeout + m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil @@ -216,7 +216,7 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(*payload.Item) + v := value.(*kvv1.Item) if v.Timeout == "" { return true } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index 5890367b..0aaa6352 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -7,9 +7,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index abcd7f47..7841f9a2 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,6 +1,6 @@ package kv -import "github.com/spiral/roadrunner/v2/plugins/kv/payload" +import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -16,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...*payload.Item) error + Set(items ...*kvv1.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...*payload.Item) error + MExpire(items ...*kvv1.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 009a5c56..a9efe0c4 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,7 +2,7 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -17,7 +17,7 @@ type rpc struct { } // Has accept []*payload.Payload proto payload with Storage and Item -func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { +func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") if in.Storage == "" { @@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { } // Set accept proto payload with Storage and Item -func (r *rpc) Set(in *payload.Payload, ok *bool) error { +func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_set") if st, exists := r.storages[in.GetStorage()]; exists { @@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error { } // MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") keys := make([]string, 0, len(in.GetItems())) @@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { } // MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { +func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") if st, exists := r.storages[in.GetStorage()]; exists { @@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { } // TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") keys := make([]string, 0, len(in.GetItems())) @@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { } // Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *payload.Payload, ok *bool) error { +func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rcp_delete") keys := make([]string, 0, len(in.GetItems())) diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 76bef400..ac9ebcc2 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,7 +4,7 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -23,13 +23,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *message.Message + out chan *websocketsv1.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *message.Message, 100) + out := make(chan *websocketsv1.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -67,7 +67,7 @@ func (fi *FanIn) read() { return } - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) if err != nil { fi.log.Error("message unmarshal") @@ -97,6 +97,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *websocketsv1.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 695e7b08..47ffeb39 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -107,7 +107,7 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return errors.E(err) @@ -126,7 +126,7 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { p.log.Error("message unmarshal error") @@ -209,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { return <-p.fanin.consume(), nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 951c9a1a..e3d47166 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -8,8 +8,8 @@ import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - msg := &message.Message{} + msg := &websocketsv1.Message{} err = json.Unmarshal(data, msg) if err != nil { diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go index deb927ed..cef28182 100644 --- a/plugins/websockets/memory/inMemory.go +++ b/plugins/websockets/memory/inMemory.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } } -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return nil, err diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index cf21fffa..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - // attach default driver - if len(p.pubsubs) == 0 { - p.pubsubs["memory"] = memory.NewInMemory(p.log) - } - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, @@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { return err @@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index efafb2d3..1a7c6f8a 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/fasthttp/websocket" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" ) @@ -16,7 +16,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *message.Message + queue chan *websocketsv1.Message exit chan struct{} } @@ -24,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *message.Message, 100), + queue: make(chan *websocketsv1.Message, 100), storage: pubsubs, log: log, exit: make(chan struct{}), @@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *message.Message) { +func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { wp.queue <- msg } diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index ef44884a..00c1dd91 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -2,7 +2,7 @@ package websockets import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -15,7 +15,7 @@ type rpc struct { // Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(in *message.Messages, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") // just return in case of nil message @@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error { +func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("publish_async") // just return in case of nil message diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 1bcb3455..760b6951 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,13 +12,13 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" @@ -107,7 +107,6 @@ func kvSetTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ Storage: "boltdb-south", Items: []*payload.Item{ @@ -130,7 +129,6 @@ func kvHasTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ Storage: "boltdb-south", Items: []*payload.Item{ diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 593085b7..4e4c09f1 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -16,7 +16,7 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -216,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) { }() time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} @@ -870,8 +870,8 @@ func publish2(command string, broker string, topics ...string) { panic(err) } } -func messageWS(command string, broker string, payload []byte, topics ...string) *message.Message { - return &message.Message{ +func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { + return &websocketsv1.Message{ Topics: topics, Command: command, Broker: broker, @@ -879,9 +879,9 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *message.Messages { - m := &message.Messages{ - Messages: []*message.Message{ +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages { + m := &websocketsv1.Messages{ + Messages: []*websocketsv1.Message{ { Topics: topics, Command: command, |