From 47c40407a7ca5f1391f4d3d504d0def166eac4e9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 8 Jun 2021 18:03:48 +0300 Subject: - Switch from the flatbuffers to the protobuf Signed-off-by: Valery Piashchynski --- go.mod | 3 +- go.sum | 2 - pkg/pubsub/message.fbs | 14 - pkg/pubsub/message.go | 15 - pkg/pubsub/message.proto | 15 + pkg/pubsub/message/Message.go | 118 ---- pkg/pubsub/message/Messages.go | 67 --- pkg/pubsub/message/message.pb.go | 235 ++++++++ plugins/kv/drivers/boltdb/driver.go | 17 +- plugins/kv/drivers/memcached/driver.go | 18 +- plugins/kv/drivers/memcached/plugin.go | 2 - plugins/kv/drivers/memory/driver.go | 43 +- plugins/kv/drivers/redis/driver.go | 20 +- plugins/kv/interface.go | 14 +- plugins/kv/payload.proto | 17 + plugins/kv/payload/generated/Item.go | 67 --- plugins/kv/payload/generated/Payload.go | 71 --- plugins/kv/payload/payload.fbs | 14 - plugins/kv/payload/payload.pb.go | 234 ++++++++ plugins/kv/rpc.go | 153 ++---- plugins/memory/plugin.go | 14 +- plugins/redis/fanin.go | 11 +- plugins/redis/plugin.go | 27 +- plugins/websockets/executor/executor.go | 3 +- plugins/websockets/plugin.go | 34 +- plugins/websockets/pool/workers_pool.go | 46 +- plugins/websockets/rpc.go | 91 +--- tests/plugins/kv/storage_plugin_test.go | 632 ++++++++++++---------- tests/plugins/websockets/websocket_plugin_test.go | 79 +-- 29 files changed, 1049 insertions(+), 1027 deletions(-) delete mode 100644 pkg/pubsub/message.fbs delete mode 100644 pkg/pubsub/message.go create mode 100644 pkg/pubsub/message.proto delete mode 100644 pkg/pubsub/message/Message.go delete mode 100644 pkg/pubsub/message/Messages.go create mode 100644 pkg/pubsub/message/message.pb.go create mode 100644 plugins/kv/payload.proto delete mode 100644 plugins/kv/payload/generated/Item.go delete mode 100644 plugins/kv/payload/generated/Payload.go delete mode 100644 plugins/kv/payload/payload.fbs create mode 100644 plugins/kv/payload/payload.pb.go diff --git a/go.mod b/go.mod index 797881ca..0b8661c0 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/go-redis/redis/v8 v8.9.0 github.com/gofiber/fiber/v2 v2.10.0 github.com/golang/mock v1.4.4 - github.com/google/flatbuffers v2.0.0+incompatible + github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.2.0 github.com/json-iterator/go v1.1.11 github.com/klauspost/compress v1.13.0 @@ -36,4 +36,5 @@ require ( golang.org/x/net v0.0.0-20210226101413-39120d07d75e golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 + google.golang.org/protobuf v1.23.0 ) diff --git a/go.sum b/go.sum index a37991a8..f0b99003 100644 --- a/go.sum +++ b/go.sum @@ -148,8 +148,6 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/flatbuffers v2.0.0+incompatible h1:dicJ2oXwypfwUGnB2/TYWYEKiuk9eYQlQO/AnOHl5mI= -github.com/google/flatbuffers v2.0.0+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs deleted file mode 100644 index 7e975894..00000000 --- a/pkg/pubsub/message.fbs +++ /dev/null @@ -1,14 +0,0 @@ -namespace message; - -table Message { - command:string; - broker:string; - topics:[string]; - payload:[byte]; -} - -table Messages { - messages:[Message]; -} - -root_type Messages; diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go deleted file mode 100644 index 74348722..00000000 --- a/pkg/pubsub/message.go +++ /dev/null @@ -1,15 +0,0 @@ -package pubsub - -type Message struct { - // Command (join, leave, headers) - Command string `json:"command"` - - // Broker (redis, memory) - Broker string `json:"broker"` - - // Topic message been pushed into. - Topics []string `json:"topic"` - - // Payload to be broadcasted - Payload []byte `json:"payload"` -} diff --git a/pkg/pubsub/message.proto b/pkg/pubsub/message.proto new file mode 100644 index 00000000..772e7611 --- /dev/null +++ b/pkg/pubsub/message.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package pubsub.v1; +option go_package = "./message"; + +message Message { + string command = 1; + string broker = 2; + repeated string topics = 3; + bytes payload = 4; +} + +message Messages { + repeated Message messages = 1; +} diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go deleted file mode 100644 index 26bbd12c..00000000 --- a/pkg/pubsub/message/Message.go +++ /dev/null @@ -1,118 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package message - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Message struct { - _tab flatbuffers.Table -} - -func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Message{} - x.Init(buf, n+offset) - return x -} - -func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { - n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) - x := &Message{} - x.Init(buf, n+offset+flatbuffers.SizeUint32) - return x -} - -func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Message) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Message) Command() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Message) Broker() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Message) Topics(j int) []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) - } - return nil -} - -func (rcv *Message) TopicsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func (rcv *Message) Payload(j int) int8 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1)) - } - return 0 -} - -func (rcv *Message) PayloadLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func (rcv *Message) MutatePayload(j int, n int8) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n) - } - return false -} - -func MessageStart(builder *flatbuffers.Builder) { - builder.StartObject(4) -} -func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0) -} -func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0) -} -func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0) -} -func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) -} -func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(1, numElems, 1) -} -func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go deleted file mode 100644 index 633b367d..00000000 --- a/pkg/pubsub/message/Messages.go +++ /dev/null @@ -1,67 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package message - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Messages struct { - _tab flatbuffers.Table -} - -func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Messages{} - x.Init(buf, n+offset) - return x -} - -func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { - n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) - x := &Messages{} - x.Init(buf, n+offset+flatbuffers.SizeUint32) - return x -} - -func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Messages) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Messages) Messages(obj *Message, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - x := rcv._tab.Vector(o) - x += flatbuffers.UOffsetT(j) * 4 - x = rcv._tab.Indirect(x) - obj.Init(rcv._tab.Bytes, x) - return true - } - return false -} - -func (rcv *Messages) MessagesLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func MessagesStart(builder *flatbuffers.Builder) { - builder.StartObject(1) -} -func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0) -} -func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/pkg/pubsub/message/message.pb.go b/pkg/pubsub/message/message.pb.go new file mode 100644 index 00000000..3a73c39c --- /dev/null +++ b/pkg/pubsub/message/message.pb.go @@ -0,0 +1,235 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: message.proto + +package message + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"` + Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"` + Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` + Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetCommand() string { + if x != nil { + return x.Command + } + return "" +} + +func (x *Message) GetBroker() string { + if x != nil { + return x.Broker + } + return "" +} + +func (x *Message) GetTopics() []string { + if x != nil { + return x.Topics + } + return nil +} + +func (x *Message) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type Messages struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *Messages) Reset() { + *x = Messages{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Messages) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Messages) ProtoMessage() {} + +func (x *Messages) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Messages.ProtoReflect.Descriptor instead. +func (*Messages) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{1} +} + +func (x *Messages) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +var File_message_proto protoreflect.FileDescriptor + +var file_message_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x09, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, + 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x3a, 0x0a, 0x08, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, + 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_message_proto_rawDescOnce sync.Once + file_message_proto_rawDescData = file_message_proto_rawDesc +) + +func file_message_proto_rawDescGZIP() []byte { + file_message_proto_rawDescOnce.Do(func() { + file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData) + }) + return file_message_proto_rawDescData +} + +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_message_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: pubsub.v1.Message + (*Messages)(nil), // 1: pubsub.v1.Messages +} +var file_message_proto_depIdxs = []int32{ + 0, // 0: pubsub.v1.Messages.messages:type_name -> pubsub.v1.Message + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_message_proto_init() } +func file_message_proto_init() { + if File_message_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Messages); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_message_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_message_proto_goTypes, + DependencyIndexes: file_message_proto_depIdxs, + MessageInfos: file_message_proto_msgTypes, + }.Build() + File_message_proto = out.File + file_message_proto_rawDesc = nil + file_message_proto_goTypes = nil + file_message_proto_depIdxs = nil +} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 0f647cb1..af107dff 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" @@ -213,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -259,14 +260,14 @@ func (d *Driver) Set(items ...kv.Item) error { // if there are no errors, and TTL > 0, we put the key with timeout to the hashmap, for future check // we do not need mutex here, since we use sync.Map - if items[i].TTL != "" { + if items[i].Timeout != "" { // check correctness of provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } // Store key TTL in the separate map - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } buf.Reset() @@ -323,20 +324,20 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - d.gc.Store(items[i].Key, items[i].TTL) + d.gc.Store(items[i].Key, items[i].Timeout) } return nil } diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 02281ed5..9c4689c4 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -134,14 +135,14 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { - if items[i] == EmptyItem { + if items[i] == nil { return errors.E(op, errors.EmptyItem) } @@ -154,9 +155,9 @@ func (d *Driver) Set(items ...kv.Item) error { } // add additional TTL in case of TTL isn't empty - if items[i].TTL != "" { + if items[i].Timeout != "" { // verify the TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -175,15 +176,18 @@ func (d *Driver) Set(items ...kv.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // verify provided TTL - t, err := time.Parse(time.RFC3339, items[i].TTL) + t, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index cde84f42..3997e0d4 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -9,8 +9,6 @@ import ( const PluginName = "memcached" -var EmptyItem = kv.Item{} - type Plugin struct { // config plugin cfgPlugin config.Configurer diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index c2494ee7..fae2c831 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -71,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(kv.Item).Value), nil + return utils.AsBytes(data.(*payload.Item).Value), nil } return nil, nil } @@ -94,24 +95,27 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(kv.Item).Value + m[keys[i]] = value.(*payload.Item).Value } } return m, nil } -func (s *Driver) Set(items ...kv.Item) error { +func (s *Driver) Set(items ...*payload.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) } for i := range items { + if items[i] == nil { + continue + } // TTL is set - if items[i].TTL != "" { + if items[i].Timeout != "" { // check the TTL in the item - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return err } @@ -124,28 +128,31 @@ func (s *Driver) Set(items ...kv.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...kv.Item) error { +func (s *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { - if items[i].TTL == "" || strings.TrimSpace(items[i].Key) == "" { + if items[i] == nil { + continue + } + if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } // if key exist, overwrite it value - if pItem, ok := s.heap.Load(items[i].Key); ok { + if pItem, ok := s.heap.LoadAndDelete(items[i].Key); ok { // check that time is correct - _, err := time.Parse(time.RFC3339, items[i].TTL) + _, err := time.Parse(time.RFC3339, items[i].Timeout) if err != nil { return errors.E(op, err) } - tmp := pItem.(kv.Item) + tmp := pItem.(*payload.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, kv.Item{ - Key: items[i].Key, - Value: tmp.Value, - TTL: items[i].TTL, + s.heap.Store(items[i].Key, &payload.Item{ + Key: items[i].Key, + Value: tmp.Value, + Timeout: items[i].Timeout, }) } } @@ -171,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(kv.Item).TTL + m[keys[i]] = item.(*payload.Item).Timeout } } return m, nil @@ -209,12 +216,12 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(kv.Item) - if v.TTL == "" { + v := value.(*payload.Item) + if v.Timeout == "" { return true } - t, err := time.Parse(time.RFC3339, v.TTL) + t, err := time.Parse(time.RFC3339, v.Timeout) if err != nil { return false } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index d0b541b2..5890367b 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -9,11 +9,10 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" ) -var EmptyItem = kv.Item{} - type Driver struct { universalClient redis.UniversalClient log logger.Logger @@ -139,24 +138,24 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...kv.Item) error { +func (d *Driver) Set(items ...*payload.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) } now := time.Now() for _, item := range items { - if item == EmptyItem { + if item == nil { return errors.E(op, errors.EmptyKey) } - if item.TTL == "" { + if item.Timeout == "" { err := d.universalClient.Set(context.Background(), item.Key, item.Value, 0).Err() if err != nil { return err } } else { - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } @@ -188,15 +187,18 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...kv.Item) error { +func (d *Driver) MExpire(items ...*payload.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { - if item.TTL == "" || strings.TrimSpace(item.Key) == "" { + if item == nil { + continue + } + if item.Timeout == "" || strings.TrimSpace(item.Key) == "" { return errors.E(op, errors.Str("should set timeout and at least one key")) } - t, err := time.Parse(time.RFC3339, item.TTL) + t, err := time.Parse(time.RFC3339, item.Timeout) if err != nil { return err } diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 20dbb8b3..abcd7f47 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,14 +1,6 @@ package kv -// Item represents general storage item -type Item struct { - // Key of item - Key string - // Value of item - Value string - // live until time provided by TTL in RFC 3339 format - TTL string -} +import "github.com/spiral/roadrunner/v2/plugins/kv/payload" // Storage represents single abstract storage. type Storage interface { @@ -24,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...Item) error + Set(items ...*payload.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...Item) error + MExpire(items ...*payload.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto new file mode 100644 index 00000000..66a22ee4 --- /dev/null +++ b/plugins/kv/payload.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package kv.v1; +option go_package = "./payload"; + +message Payload { + // could be an enum in the future + string storage = 1; + repeated Item items = 2; +} + +message Item { + string key = 1; + string value = 2; + // RFC 3339 + string timeout = 3; +} diff --git a/plugins/kv/payload/generated/Item.go b/plugins/kv/payload/generated/Item.go deleted file mode 100644 index 61bd6024..00000000 --- a/plugins/kv/payload/generated/Item.go +++ /dev/null @@ -1,67 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Item struct { - _tab flatbuffers.Table -} - -func GetRootAsItem(buf []byte, offset flatbuffers.UOffsetT) *Item { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Item{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Item) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Item) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Item) Key() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Value() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Item) Timeout() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func ItemStart(builder *flatbuffers.Builder) { - builder.StartObject(3) -} -func ItemAddKey(builder *flatbuffers.Builder, Key flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Key), 0) -} -func ItemAddValue(builder *flatbuffers.Builder, Value flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Value), 0) -} -func ItemAddTimeout(builder *flatbuffers.Builder, Timeout flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(Timeout), 0) -} -func ItemEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/generated/Payload.go b/plugins/kv/payload/generated/Payload.go deleted file mode 100644 index a2c6cfdb..00000000 --- a/plugins/kv/payload/generated/Payload.go +++ /dev/null @@ -1,71 +0,0 @@ -// Code generated by the FlatBuffers compiler. DO NOT EDIT. - -package generated - -import ( - flatbuffers "github.com/google/flatbuffers/go" -) - -type Payload struct { - _tab flatbuffers.Table -} - -func GetRootAsPayload(buf []byte, offset flatbuffers.UOffsetT) *Payload { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &Payload{} - x.Init(buf, n+offset) - return x -} - -func (rcv *Payload) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *Payload) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *Payload) Storage() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *Payload) Items(obj *Item, j int) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - x := rcv._tab.Vector(o) - x += flatbuffers.UOffsetT(j) * 4 - x = rcv._tab.Indirect(x) - obj.Init(rcv._tab.Bytes, x) - return true - } - return false -} - -func (rcv *Payload) ItemsLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func PayloadStart(builder *flatbuffers.Builder) { - builder.StartObject(2) -} -func PayloadAddStorage(builder *flatbuffers.Builder, Storage flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(Storage), 0) -} -func PayloadAddItems(builder *flatbuffers.Builder, Items flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(Items), 0) -} -func PayloadStartItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(4, numElems, 4) -} -func PayloadEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/plugins/kv/payload/payload.fbs b/plugins/kv/payload/payload.fbs deleted file mode 100644 index 7e02c1a0..00000000 --- a/plugins/kv/payload/payload.fbs +++ /dev/null @@ -1,14 +0,0 @@ -namespace generated; - -table Payload { - Storage:string; - Items:[Item]; -} - -table Item { - Key:string; - Value:string; - Timeout:string; -} - -root_type Payload; diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go new file mode 100644 index 00000000..60ad9f13 --- /dev/null +++ b/plugins/kv/payload/payload.pb.go @@ -0,0 +1,234 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: payload.proto + +package payload + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Payload struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // could be an enum in the future + Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"` + Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` +} + +func (x *Payload) Reset() { + *x = Payload{} + if protoimpl.UnsafeEnabled { + mi := &file_payload_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Payload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Payload) ProtoMessage() {} + +func (x *Payload) ProtoReflect() protoreflect.Message { + mi := &file_payload_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Payload.ProtoReflect.Descriptor instead. +func (*Payload) Descriptor() ([]byte, []int) { + return file_payload_proto_rawDescGZIP(), []int{0} +} + +func (x *Payload) GetStorage() string { + if x != nil { + return x.Storage + } + return "" +} + +func (x *Payload) GetItems() []*Item { + if x != nil { + return x.Items + } + return nil +} + +type Item struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Item) Reset() { + *x = Item{} + if protoimpl.UnsafeEnabled { + mi := &file_payload_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Item) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Item) ProtoMessage() {} + +func (x *Item) ProtoReflect() protoreflect.Message { + mi := &file_payload_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Item.ProtoReflect.Descriptor instead. +func (*Item) Descriptor() ([]byte, []int) { + return file_payload_proto_rawDescGZIP(), []int{1} +} + +func (x *Item) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *Item) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *Item) GetTimeout() string { + if x != nil { + return x.Timeout + } + return "" +} + +var File_payload_proto protoreflect.FileDescriptor + +var file_payload_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, + 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65, + 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, + 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_payload_proto_rawDescOnce sync.Once + file_payload_proto_rawDescData = file_payload_proto_rawDesc +) + +func file_payload_proto_rawDescGZIP() []byte { + file_payload_proto_rawDescOnce.Do(func() { + file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData) + }) + return file_payload_proto_rawDescData +} + +var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_payload_proto_goTypes = []interface{}{ + (*Payload)(nil), // 0: kv.Payload + (*Item)(nil), // 1: kv.Item +} +var file_payload_proto_depIdxs = []int32{ + 1, // 0: kv.Payload.items:type_name -> kv.Item + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_payload_proto_init() } +func file_payload_proto_init() { + if File_payload_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Payload); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Item); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_payload_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_payload_proto_goTypes, + DependencyIndexes: file_payload_proto_depIdxs, + MessageInfos: file_payload_proto_msgTypes, + }.Build() + File_payload_proto = out.File + file_payload_proto_rawDesc = nil + file_payload_proto_goTypes = nil + file_payload_proto_depIdxs = nil +} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 2d4babbe..009a5c56 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,9 +2,8 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" ) // Wrapper for the plugin @@ -17,23 +16,21 @@ type rpc struct { log logger.Logger } -// Has accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Has(in []byte, res *map[string]bool) error { +// Has accept []*payload.Payload proto payload with Storage and Item +func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) + if in.Storage == "" { + return errors.E(op, errors.Str("no storage provided")) + } - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, utils.AsString(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, ok := r.storages[utils.AsString(dataRoot.Storage())]; ok { + if st, ok := r.storages[in.Storage]; ok { ret, err := st.Has(keys...) if err != nil { return err @@ -45,35 +42,15 @@ func (r *rpc) Has(in []byte, res *map[string]bool) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Set accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Set(in []byte, ok *bool) error { +// Set accept proto payload with Storage and Item +func (r *rpc) Set(in *payload.Payload, ok *bool) error { const op = errors.Op("rpc_set") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - items := make([]Item, 0, dataRoot.ItemsLength()) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - Value: string(tmpItem.Value()), - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.Set(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.Set(in.GetItems()...) if err != nil { return err } @@ -84,26 +61,20 @@ func (r *rpc) Set(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MGet accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { +// MGet accept proto payload with Storage and Item +func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} + keys := make([]string, 0, len(in.GetItems())) - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.MGet(keys...) if err != nil { return err @@ -114,36 +85,15 @@ func (r *rpc) MGet(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// MExpire accept []byte flatbuffers payload with Storage and Item -func (r *rpc) MExpire(in []byte, ok *bool) error { +// MExpire accept proto payload with Storage and Item +func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - - // when unmarshalling the keys, simultaneously, fill up the slice with items - items := make([]Item, 0, l) - tmpItem := &generated.Item{} - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - - itc := Item{ - Key: string(tmpItem.Key()), - // we set up timeout on the keys, so, value here is redundant - Value: "", - TTL: string(tmpItem.Timeout()), - } - - items = append(items, itc) - } - - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { - err := st.MExpire(items...) + if st, exists := r.storages[in.GetStorage()]; exists { + err := st.MExpire(in.GetItems()...) if err != nil { return errors.E(op, err) } @@ -154,25 +104,19 @@ func (r *rpc) MExpire(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// TTL accept []byte flatbuffers payload with Storage and Item -func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { +// TTL accept proto payload with Storage and Item +func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { ret, err := st.TTL(keys...) if err != nil { return err @@ -183,24 +127,19 @@ func (r *rpc) TTL(in []byte, res *map[string]interface{}) error { return nil } - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } -// Delete accept []byte flatbuffers payload with Storage and Item -func (r *rpc) Delete(in []byte, ok *bool) error { +// Delete accept proto payload with Storage and Item +func (r *rpc) Delete(in *payload.Payload, ok *bool) error { const op = errors.Op("rcp_delete") - dataRoot := generated.GetRootAsPayload(in, 0) - l := dataRoot.ItemsLength() - keys := make([]string, 0, l) - tmpItem := &generated.Item{} - - for i := 0; i < l; i++ { - if !dataRoot.Items(tmpItem, i) { - continue - } - keys = append(keys, string(tmpItem.Key())) + + keys := make([]string, 0, len(in.GetItems())) + + for i := 0; i < len(in.GetItems()); i++ { + keys = append(keys, in.Items[i].Key) } - if st, exists := r.storages[utils.AsString(dataRoot.Storage())]; exists { + if st, exists := r.storages[in.GetStorage()]; exists { err := st.Delete(keys...) if err != nil { return errors.E(op, err) @@ -212,5 +151,5 @@ func (r *rpc) Delete(in []byte, ok *bool) error { } *ok = false - return errors.E(op, errors.Errorf("no such storage: %s", dataRoot.Storage())) + return errors.E(op, errors.Errorf("no such storage: %s", in.GetStorage())) } diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 6732ff5d..d724dff9 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -6,7 +6,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const ( @@ -86,15 +86,19 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return nil, err + } // push only messages, which are subscribed // TODO better??? - for i := 0; i < fbsMsg.TopicsLength(); i++ { + for i := 0; i < len(m.GetTopics()); i++ { // if we have active subscribers - send a message to a topic // or send nil instead - if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok { - return fbsMsg, nil + if ok := p.storage.Contains(m.GetTopics()[i]); ok { + return m, nil } } return nil, nil diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 321bfaaa..76bef400 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" "github.com/spiral/errors" @@ -65,7 +66,15 @@ func (fi *FanIn) read() { if !ok { return } - fi.out <- message.GetRootAsMessage(utils.AsBytes(msg.Payload), 0) + + m := &message.Message{} + err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) + if err != nil { + fi.log.Error("message unmarshal") + continue + } + + fi.out <- m case <-fi.exit: return } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index b2603a40..695e7b08 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -9,7 +9,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const PluginName = "redis" @@ -107,10 +107,14 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return errors.E(err) + } - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { return f.Err() } @@ -122,12 +126,17 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(msg, 0) - for j := 0; j < fbsMsg.TopicsLength(); j++ { - f := p.universalClient.Publish(context.Background(), utils.AsString(fbsMsg.Topics(j)), fbsMsg.Table().Bytes) + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + p.log.Error("message unmarshal error") + return + } + + for j := 0; j < len(m.GetTopics()); j++ { + f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg) if f.Err() != nil { - p.log.Error("errors publishing message", "topic", fbsMsg.Topics(j), "error", f.Err().Error()) - return + p.log.Error("redis publish", "error", f.Err()) } } }() diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 69aad7d4..951c9a1a 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -9,6 +9,7 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -63,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - msg := &pubsub.Message{} + msg := &message.Message{} err = json.Unmarshal(data, msg) if err != nil { diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 4c0edcad..39a4e139 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -25,7 +25,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - "github.com/spiral/roadrunner/v2/utils" + "google.golang.org/protobuf/proto" ) const ( @@ -301,16 +301,21 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() + msg := &message.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + return err + } + // Get payload - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } return nil @@ -320,16 +325,21 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - fbsMsg := message.GetRootAsMessage(m, 0) - for i := 0; i < fbsMsg.TopicsLength(); i++ { - if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { - err := br.Publish(fbsMsg.Table().Bytes) + msg := &message.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + p.log.Error("message unmarshal") + } + + // Get payload + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.pubsubs[msg.GetBroker()]; ok { + err := br.Publish(m) if err != nil { p.log.Error("publish async error", "error", err) - return } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) } } }() diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 544f3ede..efafb2d3 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,7 +1,6 @@ package pool import ( - "bytes" "sync" "github.com/fasthttp/websocket" @@ -9,14 +8,12 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool - bPool sync.Pool log logger.Logger queue chan *message.Message @@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } - wp.bPool.New = func() interface{} { - return new(bytes.Buffer) - } // start 10 workers for i := 0; i < 50; i++ { @@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } -func (wp *WorkersPool) putBytes(b *bytes.Buffer) { - b.Reset() - wp.bPool.Put(b) -} - -func (wp *WorkersPool) getBytes() *bytes.Buffer { - return wp.bPool.Get().(*bytes.Buffer) -} - func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg == nil { continue } - if msg.TopicsLength() == 0 { + if len(msg.GetTopics()) == 0 { continue } - br, ok := wp.storage[utils.AsString(msg.Broker())] + br, ok := wp.storage[msg.Broker] if !ok { - wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) continue } res := wp.get() - bb := wp.getBytes() - for i := 0; i < msg.TopicsLength(); i++ { + for i := 0; i < len(msg.GetTopics()); i++ { // get connections for the particular topic - br.Connections(utils.AsString(msg.Topics(i)), res) + br.Connections(msg.GetTopics()[i], res) } if len(res) == 0 { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) } - wp.putBytes(bb) wp.put(res) continue } @@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } @@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit conn := c.(*connection.Connection) // put data into the bytes buffer - for i := 0; i < msg.PayloadLength(); i++ { - bb.WriteByte(byte(msg.Payload(i))) - } - err := conn.Write(websocket.BinaryMessage, bb.Bytes()) + err := conn.Write(websocket.BinaryMessage, msg.GetPayload()) if err != nil { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } } - // put bytes buffer back - wp.putBytes(bb) // put map with results back wp.put(res) case <-wp.exit: diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 6c2cacb4..ef44884a 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,10 +1,10 @@ package websockets import ( - flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -13,39 +13,32 @@ type rpc struct { log logger.Logger } -// Publish ... msg is a flatbuffers decoded payload +// Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(msg []byte, ok *bool) error { +func (r *rpc) Publish(in *message.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} + r.log.Debug("message published", "msg", in.Messages) - b := flatbuffers.NewBuilder(100) + msgLen := len(in.GetMessages()) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) - - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } - err := r.plugin.Publish(bb) + err = r.plugin.Publish(bb) if err != nil { *ok = false - b.Reset() return errors.E(op, err) } - b.Reset() } *ok = true @@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(msg []byte, ok *bool) error { - r.log.Debug("message published", "msg", msg) +func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error { + const op = errors.Op("publish_async") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} - - b := flatbuffers.NewBuilder(100) + r.log.Debug("message published", "msg", in.Messages) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) + msgLen := len(in.GetMessages()) - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } r.plugin.PublishAsync(bb) - b.Reset() } *ok = true return nil } - -func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { - cmdOff := b.CreateByteString(msg.Command()) - brokerOff := b.CreateByteString(msg.Broker()) - - offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) - for j := msg.TopicsLength() - 1; j >= 0; j-- { - offsets[j] = b.CreateByteString(msg.Topics(j)) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - bb := make([]byte, msg.PayloadLength()) - for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) - } - pOff := b.CreateByteVector(bb) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - fOff := message.MessageEnd(b) - b.Finish(fOff) - - return b.FinishedBytes() -} diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index b4122e8a..1bcb3455 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -10,7 +10,6 @@ import ( "testing" "time" - flatbuffers "github.com/google/flatbuffers/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" "github.com/spiral/roadrunner/v2/plugins/config" @@ -19,57 +18,12 @@ import ( "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" - "github.com/spiral/roadrunner/v2/plugins/kv/payload/generated" + "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" ) -func makePayload(b *flatbuffers.Builder, storage string, items []kv.Item) []byte { - b.Reset() - - storageOffset := b.CreateString(storage) - - // //////////////////// ITEMS VECTOR //////////////////////////// - offset := make([]flatbuffers.UOffsetT, len(items)) - for i := len(items) - 1; i >= 0; i-- { - offset[i] = serializeItems(b, items[i]) - } - - generated.PayloadStartItemsVector(b, len(offset)) - - for i := len(offset) - 1; i >= 0; i-- { - b.PrependUOffsetT(offset[i]) - } - - itemsOffset := b.EndVector(len(offset)) - // ///////////////////////////////////////////////////////////////// - - generated.PayloadStart(b) - generated.PayloadAddItems(b, itemsOffset) - generated.PayloadAddStorage(b, storageOffset) - - finalOffset := generated.PayloadEnd(b) - - b.Finish(finalOffset) - - return b.Bytes[b.Head():] -} - -func serializeItems(b *flatbuffers.Builder, item kv.Item) flatbuffers.UOffsetT { - key := b.CreateString(item.Key) - val := b.CreateString(item.Value) - ttl := b.CreateString(item.TTL) - - generated.ItemStart(b) - - generated.ItemAddKey(b, key) - generated.ItemAddValue(b, val) - generated.ItemAddTimeout(b, ttl) - - return generated.ItemEnd(b) -} - func TestKVInit(t *testing.T) { cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) assert.NoError(t, err) @@ -154,17 +108,19 @@ func kvSetTest(t *testing.T) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - b := flatbuffers.NewBuilder(100) - args := makePayload(b, "boltdb-south", []kv.Item{ - { - Key: "key", - Value: "val", + p := &payload.Payload{ + Storage: "boltdb-south", + Items: []*payload.Item{ + { + Key: "key", + Value: "val", + }, }, - }) + } var ok bool - err = client.Call("kv.Set", args, &ok) + err = client.Call("kv.Set", p, &ok) assert.NoError(t, err) assert.True(t, ok, "Set return result") } @@ -175,16 +131,19 @@ func kvHasTest(t *testing.T) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - b := flatbuffers.NewBuilder(100) - args := makePayload(b, "boltdb-south", []kv.Item{ - { - Key: "key", - Value: "val", + p := &payload.Payload{ + Storage: "boltdb-south", + Items: []*payload.Item{ + { + Key: "key", + Value: "val", + }, }, - }) + } + var ret map[string]bool - err = client.Call("kv.Has", args, &ret) + err = client.Call("kv.Has", p, &ret) assert.NoError(t, err) } @@ -250,7 +209,7 @@ func TestBoltDb(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("testBoltDbRPCMethods", testRPCMethods) + t.Run("BOLTDB", testRPCMethods) stopCh <- struct{}{} wg.Wait() @@ -264,40 +223,48 @@ func testRPCMethods(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + + keys := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -327,20 +294,24 @@ func testRPCMethods(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -349,17 +320,21 @@ func testRPCMethods(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "d", + keys2 := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -373,11 +348,15 @@ func testRPCMethods(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "boltdb-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "boltdb-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -464,40 +443,48 @@ func testRPCMethodsMemcached(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + + keys := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -527,20 +514,24 @@ func testRPCMethodsMemcached(t *testing.T) { assert.Equal(t, string("bb"), string(mGet["b"].([]byte))) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -549,17 +540,21 @@ func testRPCMethodsMemcached(t *testing.T) { assert.True(t, mExpRes) // TTL call is not supported for the memcached driver - keys2 := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", + keys2 := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.Error(t, err) @@ -573,11 +568,15 @@ func testRPCMethodsMemcached(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "memcached-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "memcached-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -664,40 +663,47 @@ func testRPCMethodsInMemory(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", + keys := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", - }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -727,20 +733,24 @@ func testRPCMethodsInMemory(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, - }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, + + data2 := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - }) + } // MEXPIRE var mExpRes bool @@ -749,17 +759,21 @@ func testRPCMethodsInMemory(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "a", + keys2 := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "b", - }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -773,11 +787,15 @@ func testRPCMethodsInMemory(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "memory-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "memory-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) @@ -864,40 +882,47 @@ func testRPCMethodsRedis(t *testing.T) { // add 5 second ttl tt := time.Now().Add(time.Second * 5).Format(time.RFC3339) - keys := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - }, - { - Key: "b", - }, - { - Key: "c", - }, - }) - data := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - Value: "aa", - }, - { - Key: "b", - Value: "bb", - }, - { - Key: "c", - Value: "cc", - TTL: tt, - }, - { - Key: "d", - Value: "dd", + keys := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "c", + }, }, - { - Key: "e", - Value: "ee", + } + + data := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + Value: "aa", + }, + { + Key: "b", + Value: "bb", + }, + { + Key: "c", + Value: "cc", + Timeout: tt, + }, + { + Key: "d", + Value: "dd", + }, + { + Key: "e", + Value: "ee", + }, }, - }) + } var setRes bool @@ -927,20 +952,23 @@ func testRPCMethodsRedis(t *testing.T) { assert.Equal(t, "bb", mGet["b"].(string)) tt2 := time.Now().Add(time.Second * 10).Format(time.RFC3339) - data2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", - TTL: tt2, + data2 := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + Timeout: tt2, + }, + { + Key: "b", + Timeout: tt2, + }, + { + Key: "d", + Timeout: tt2, + }, }, - { - Key: "b", - TTL: tt2, - }, - { - Key: "d", - TTL: tt2, - }, - }) + } // MEXPIRE var mExpRes bool @@ -949,17 +977,21 @@ func testRPCMethodsRedis(t *testing.T) { assert.True(t, mExpRes) // TTL - keys2 := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "a", + keys2 := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "a", + }, + { + Key: "b", + }, + { + Key: "d", + }, }, - { - Key: "b", - }, - { - Key: "d", - }, - }) + } + ttlRes := make(map[string]interface{}) err = client.Call("kv.TTL", keys2, &ttlRes) assert.NoError(t, err) @@ -973,11 +1005,15 @@ func testRPCMethodsRedis(t *testing.T) { assert.Len(t, ret, 0) // DELETE - keysDel := makePayload(flatbuffers.NewBuilder(100), "redis-rr", []kv.Item{ - { - Key: "e", + keysDel := &payload.Payload{ + Storage: "redis-rr", + Items: []*payload.Item{ + { + Key: "e", + }, }, - }) + } + var delRet bool err = client.Call("kv.Delete", keysDel, &delRet) assert.NoError(t, err) diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index b2c756bf..5f472106 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -13,11 +13,9 @@ import ( "time" "github.com/fasthttp/websocket" - flatbuffers "github.com/google/flatbuffers/go" json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" @@ -877,8 +875,8 @@ func publish2(command string, broker string, topics ...string) { panic(err) } } -func messageWS(command string, broker string, payload []byte, topics ...string) *Msg { - return &Msg{ +func messageWS(command string, broker string, payload []byte, topics ...string) *message.Message { + return &message.Message{ Topics: topics, Command: command, Broker: broker, @@ -886,70 +884,17 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) []byte { - m := []pubsub.Message{ - { - Topics: topics, - Command: command, - Broker: broker, - Payload: payload, +func makeMessage(command string, broker string, payload []byte, topics ...string) *message.Messages { + m := &message.Messages{ + Messages: []*message.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, }, } - b := flatbuffers.NewBuilder(1) - - return msgs(b, m) -} - -func serializeMsg(b *flatbuffers.Builder, msg pubsub.Message) flatbuffers.UOffsetT { - cmdOff := b.CreateString(msg.Command) - brokerOff := b.CreateString(msg.Broker) - - offsets := make([]flatbuffers.UOffsetT, len(msg.Topics)) - for j := len(msg.Topics) - 1; j >= 0; j-- { - offsets[j] = b.CreateString(msg.Topics[j]) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - pOff := b.CreateByteVector(msg.Payload) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - return message.MessageEnd(b) -} - -func msgs(b *flatbuffers.Builder, msgs []pubsub.Message) []byte { - b.Reset() - - mOff := make([]flatbuffers.UOffsetT, len(msgs)) - - for i := len(msgs) - 1; i >= 0; i-- { - mOff[i] = serializeMsg(b, msgs[i]) - } - - message.MessagesStartMessagesVector(b, len(mOff)) - - for i := len(mOff) - 1; i >= 0; i-- { - b.PrependUOffsetT(mOff[i]) - } - - msgsOff := b.EndVector(len(msgs)) - - message.MessagesStart(b) - message.MessagesAddMessages(b, msgsOff) - fOff := message.MessagesEnd(b) - b.Finish(fOff) - - return b.Bytes[b.Head():] + return m } -- cgit v1.2.3 From a8baaaae403a556b6d5d76bc2f7eb46cca7bfb15 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 8 Jun 2021 18:41:54 +0300 Subject: - Move ws memory pub-sub plugin into the websockets folder - Update CHANGELOG Signed-off-by: Valery Piashchynski --- CHANGELOG.md | 2 +- plugins/memory/plugin.go | 105 ---------------------- plugins/websockets/memory/inMemory.go | 95 ++++++++++++++++++++ plugins/websockets/plugin.go | 6 ++ tests/plugins/websockets/websocket_plugin_test.go | 5 -- 5 files changed, 102 insertions(+), 111 deletions(-) delete mode 100644 plugins/memory/plugin.go create mode 100644 plugins/websockets/memory/inMemory.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 80ee8681..6f4eb607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ v2.3.0 (08.06.2021) - ✏️ Brand new `broadcast` plugin now has the name - `websockets` with broadcast capabilities. It can handle hundreds of thousands websocket connections very efficiently (~300k messages per second with 1k connected clients, in-memory bus on 2CPU cores and 1GB of RAM) -- ✏️ Flatbuffers binary messages for the `websockets` RPC calls under the hood. +- ✏️ Protobuf binary messages for the `websockets` RPC calls under the hood. - ✏️ Json-schemas for the config file v1.0 (it also registered in [schemastore.org](https://github.com/SchemaStore/schemastore/pull/1614)) - ✏️ `latest` docker image tag supported now (but we strongly recommend using a versioned tag (like `0.2.3`) instead) diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go deleted file mode 100644 index d724dff9..00000000 --- a/plugins/memory/plugin.go +++ /dev/null @@ -1,105 +0,0 @@ -package memory - -import ( - "sync" - - "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" - "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" -) - -const ( - PluginName string = "memory" -) - -type Plugin struct { - sync.RWMutex - log logger.Logger - - // channel with the messages from the RPC - pushCh chan []byte - // user-subscribed topics - storage bst.Storage -} - -func (p *Plugin) Init(log logger.Logger) error { - p.log = log - p.pushCh = make(chan []byte, 10) - p.storage = bst.NewBST() - return nil -} - -// Available interface implementation for the plugin -func (p *Plugin) Available() {} - -// Name is endure.Named interface implementation -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Publish(message []byte) error { - p.pushCh <- message - return nil -} - -func (p *Plugin) PublishAsync(message []byte) { - go func() { - p.pushCh <- message - }() -} - -func (p *Plugin) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *Plugin) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *Plugin) Next() (*message.Message, error) { - msg := <-p.pushCh - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - - m := &message.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return nil, err - } - - // push only messages, which are subscribed - // TODO better??? - for i := 0; i < len(m.GetTopics()); i++ { - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(m.GetTopics()[i]); ok { - return m, nil - } - } - return nil, nil -} diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go new file mode 100644 index 00000000..deb927ed --- /dev/null +++ b/plugins/websockets/memory/inMemory.go @@ -0,0 +1,95 @@ +package memory + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/pkg/bst" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" +) + +type Plugin struct { + sync.RWMutex + log logger.Logger + + // channel with the messages from the RPC + pushCh chan []byte + // user-subscribed topics + storage bst.Storage +} + +func NewInMemory(log logger.Logger) pubsub.PubSub { + return &Plugin{ + log: log, + pushCh: make(chan []byte, 10), + storage: bst.NewBST(), + } +} + +func (p *Plugin) Publish(message []byte) error { + p.pushCh <- message + return nil +} + +func (p *Plugin) PublishAsync(message []byte) { + go func() { + p.pushCh <- message + }() +} + +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Insert(connectionID, topics[i]) + } + return nil +} + +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() + for i := 0; i < len(topics); i++ { + p.storage.Remove(connectionID, topics[i]) + } + return nil +} + +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + +func (p *Plugin) Next() (*message.Message, error) { + msg := <-p.pushCh + if msg == nil { + return nil, nil + } + + p.RLock() + defer p.RUnlock() + + m := &message.Message{} + err := proto.Unmarshal(msg, m) + if err != nil { + return nil, err + } + + // push only messages, which are subscribed + // TODO better??? + for i := 0; i < len(m.GetTopics()); i++ { + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(m.GetTopics()[i]); ok { + return m, nil + } + } + return nil, nil +} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 39a4e139..cf21fffa 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -23,6 +23,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "google.golang.org/protobuf/proto" @@ -90,6 +91,11 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() + // attach default driver + if len(p.pubsubs) == 0 { + p.pubsubs["memory"] = memory.NewInMemory(p.log) + } + p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 5f472106..593085b7 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -20,7 +20,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" @@ -168,7 +167,6 @@ func TestWSRedisAndMemory(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -462,7 +460,6 @@ func TestWSMemoryDeny(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -590,7 +587,6 @@ func TestWSMemoryStop(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) @@ -683,7 +679,6 @@ func TestWSMemoryOk(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, - &memory.Plugin{}, ) assert.NoError(t, err) -- cgit v1.2.3 From cc271dceb13d3929f0382311dfce3dfed2ce04ce Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 8 Jun 2021 22:04:28 +0300 Subject: - Add protobuf versioning Signed-off-by: Valery Piashchynski --- pkg/proto/kv/v1beta/kv.pb.go | 236 +++++++++++++++++++++ pkg/proto/kv/v1beta/kv.proto | 17 ++ pkg/proto/websockets/v1beta/websockets.pb.go | 237 ++++++++++++++++++++++ pkg/proto/websockets/v1beta/websockets.proto | 15 ++ pkg/pubsub/interface.go | 4 +- pkg/pubsub/message.proto | 15 -- pkg/pubsub/message/message.pb.go | 235 --------------------- plugins/kv/drivers/boltdb/driver.go | 6 +- plugins/kv/drivers/memcached/driver.go | 6 +- plugins/kv/drivers/memory/driver.go | 18 +- plugins/kv/drivers/redis/driver.go | 6 +- plugins/kv/interface.go | 6 +- plugins/kv/payload.proto | 17 -- plugins/kv/payload/payload.pb.go | 234 --------------------- plugins/kv/rpc.go | 14 +- plugins/redis/fanin.go | 10 +- plugins/redis/plugin.go | 8 +- plugins/websockets/executor/executor.go | 4 +- plugins/websockets/memory/inMemory.go | 6 +- plugins/websockets/plugin.go | 14 +- plugins/websockets/pool/workers_pool.go | 8 +- plugins/websockets/rpc.go | 6 +- tests/plugins/kv/storage_plugin_test.go | 4 +- tests/plugins/websockets/websocket_plugin_test.go | 14 +- 24 files changed, 570 insertions(+), 570 deletions(-) create mode 100644 pkg/proto/kv/v1beta/kv.pb.go create mode 100644 pkg/proto/kv/v1beta/kv.proto create mode 100644 pkg/proto/websockets/v1beta/websockets.pb.go create mode 100644 pkg/proto/websockets/v1beta/websockets.proto delete mode 100644 pkg/pubsub/message.proto delete mode 100644 pkg/pubsub/message/message.pb.go delete mode 100644 plugins/kv/payload.proto delete mode 100644 plugins/kv/payload/payload.pb.go diff --git a/pkg/proto/kv/v1beta/kv.pb.go b/pkg/proto/kv/v1beta/kv.pb.go new file mode 100644 index 00000000..76450869 --- /dev/null +++ b/pkg/proto/kv/v1beta/kv.pb.go @@ -0,0 +1,236 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: kv.proto + +package kvv1beta + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Payload struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // could be an enum in the future + Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"` + Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` +} + +func (x *Payload) Reset() { + *x = Payload{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Payload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Payload) ProtoMessage() {} + +func (x *Payload) ProtoReflect() protoreflect.Message { + mi := &file_kv_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Payload.ProtoReflect.Descriptor instead. +func (*Payload) Descriptor() ([]byte, []int) { + return file_kv_proto_rawDescGZIP(), []int{0} +} + +func (x *Payload) GetStorage() string { + if x != nil { + return x.Storage + } + return "" +} + +func (x *Payload) GetItems() []*Item { + if x != nil { + return x.Items + } + return nil +} + +type Item struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + // RFC 3339 + Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` +} + +func (x *Item) Reset() { + *x = Item{} + if protoimpl.UnsafeEnabled { + mi := &file_kv_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Item) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Item) ProtoMessage() {} + +func (x *Item) ProtoReflect() protoreflect.Message { + mi := &file_kv_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Item.ProtoReflect.Descriptor instead. +func (*Item) Descriptor() ([]byte, []int) { + return file_kv_proto_rawDescGZIP(), []int{1} +} + +func (x *Item) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *Item) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *Item) GetTimeout() string { + if x != nil { + return x.Timeout + } + return "" +} + +var File_kv_proto protoreflect.FileDescriptor + +var file_kv_proto_rawDesc = []byte{ + 0x0a, 0x08, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x09, 0x6b, 0x76, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x4a, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x69, 0x74, + 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x6b, 0x76, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, + 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, + 0x2f, 0x3b, 0x6b, 0x76, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, +} + +var ( + file_kv_proto_rawDescOnce sync.Once + file_kv_proto_rawDescData = file_kv_proto_rawDesc +) + +func file_kv_proto_rawDescGZIP() []byte { + file_kv_proto_rawDescOnce.Do(func() { + file_kv_proto_rawDescData = protoimpl.X.CompressGZIP(file_kv_proto_rawDescData) + }) + return file_kv_proto_rawDescData +} + +var file_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_kv_proto_goTypes = []interface{}{ + (*Payload)(nil), // 0: kv.v1beta.Payload + (*Item)(nil), // 1: kv.v1beta.Item +} +var file_kv_proto_depIdxs = []int32{ + 1, // 0: kv.v1beta.Payload.items:type_name -> kv.v1beta.Item + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_kv_proto_init() } +func file_kv_proto_init() { + if File_kv_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Payload); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_kv_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Item); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_kv_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_kv_proto_goTypes, + DependencyIndexes: file_kv_proto_depIdxs, + MessageInfos: file_kv_proto_msgTypes, + }.Build() + File_kv_proto = out.File + file_kv_proto_rawDesc = nil + file_kv_proto_goTypes = nil + file_kv_proto_depIdxs = nil +} diff --git a/pkg/proto/kv/v1beta/kv.proto b/pkg/proto/kv/v1beta/kv.proto new file mode 100644 index 00000000..1ec0e6b7 --- /dev/null +++ b/pkg/proto/kv/v1beta/kv.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package kv.v1beta; +option go_package = "./;kvv1beta"; + +message Payload { + // could be an enum in the future + string storage = 1; + repeated Item items = 2; +} + +message Item { + string key = 1; + string value = 2; + // RFC 3339 + string timeout = 3; +} diff --git a/pkg/proto/websockets/v1beta/websockets.pb.go b/pkg/proto/websockets/v1beta/websockets.pb.go new file mode 100644 index 00000000..f04c3cb2 --- /dev/null +++ b/pkg/proto/websockets/v1beta/websockets.pb.go @@ -0,0 +1,237 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.16.0 +// source: websockets.proto + +package websocketsv1beta + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"` + Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"` + Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` + Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_websockets_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_websockets_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_websockets_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetCommand() string { + if x != nil { + return x.Command + } + return "" +} + +func (x *Message) GetBroker() string { + if x != nil { + return x.Broker + } + return "" +} + +func (x *Message) GetTopics() []string { + if x != nil { + return x.Topics + } + return nil +} + +func (x *Message) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +type Messages struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` +} + +func (x *Messages) Reset() { + *x = Messages{} + if protoimpl.UnsafeEnabled { + mi := &file_websockets_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Messages) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Messages) ProtoMessage() {} + +func (x *Messages) ProtoReflect() protoreflect.Message { + mi := &file_websockets_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Messages.ProtoReflect.Descriptor instead. +func (*Messages) Descriptor() ([]byte, []int) { + return file_websockets_proto_rawDescGZIP(), []int{1} +} + +func (x *Messages) GetMessages() []*Message { + if x != nil { + return x.Messages + } + return nil +} + +var File_websockets_proto protoreflect.FileDescriptor + +var file_websockets_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x11, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, 0x76, + 0x31, 0x62, 0x65, 0x74, 0x61, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x62, 0x72, + 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x62, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x18, 0x03, 0x20, 0x03, + 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x42, 0x0a, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, + 0x12, 0x36, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x77, 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x2e, + 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x15, 0x5a, 0x13, 0x2e, 0x2f, 0x3b, 0x77, + 0x65, 0x62, 0x73, 0x6f, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x76, 0x31, 0x62, 0x65, 0x74, 0x61, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_websockets_proto_rawDescOnce sync.Once + file_websockets_proto_rawDescData = file_websockets_proto_rawDesc +) + +func file_websockets_proto_rawDescGZIP() []byte { + file_websockets_proto_rawDescOnce.Do(func() { + file_websockets_proto_rawDescData = protoimpl.X.CompressGZIP(file_websockets_proto_rawDescData) + }) + return file_websockets_proto_rawDescData +} + +var file_websockets_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_websockets_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: websockets.v1beta.Message + (*Messages)(nil), // 1: websockets.v1beta.Messages +} +var file_websockets_proto_depIdxs = []int32{ + 0, // 0: websockets.v1beta.Messages.messages:type_name -> websockets.v1beta.Message + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_websockets_proto_init() } +func file_websockets_proto_init() { + if File_websockets_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_websockets_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_websockets_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Messages); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_websockets_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_websockets_proto_goTypes, + DependencyIndexes: file_websockets_proto_depIdxs, + MessageInfos: file_websockets_proto_msgTypes, + }.Build() + File_websockets_proto = out.File + file_websockets_proto_rawDesc = nil + file_websockets_proto_goTypes = nil + file_websockets_proto_depIdxs = nil +} diff --git a/pkg/proto/websockets/v1beta/websockets.proto b/pkg/proto/websockets/v1beta/websockets.proto new file mode 100644 index 00000000..a61da93d --- /dev/null +++ b/pkg/proto/websockets/v1beta/websockets.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package websockets.v1beta; +option go_package = "./;websocketsv1beta"; + +message Message { + string command = 1; + string broker = 2; + repeated string topics = 3; + bytes payload = 4; +} + +message Messages { + repeated Message messages = 1; +} diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index eb65b4b7..4926cad6 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -1,6 +1,6 @@ package pubsub -import "github.com/spiral/roadrunner/v2/pkg/pubsub/message" +import websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" /* This interface is in BETA. It might be changed. @@ -42,5 +42,5 @@ type Publisher interface { // Reader interface should return next message type Reader interface { - Next() (*message.Message, error) + Next() (*websocketsv1.Message, error) } diff --git a/pkg/pubsub/message.proto b/pkg/pubsub/message.proto deleted file mode 100644 index 772e7611..00000000 --- a/pkg/pubsub/message.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package pubsub.v1; -option go_package = "./message"; - -message Message { - string command = 1; - string broker = 2; - repeated string topics = 3; - bytes payload = 4; -} - -message Messages { - repeated Message messages = 1; -} diff --git a/pkg/pubsub/message/message.pb.go b/pkg/pubsub/message/message.pb.go deleted file mode 100644 index 3a73c39c..00000000 --- a/pkg/pubsub/message/message.pb.go +++ /dev/null @@ -1,235 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 -// source: message.proto - -package message - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Message struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Command string `protobuf:"bytes,1,opt,name=command,proto3" json:"command,omitempty"` - Broker string `protobuf:"bytes,2,opt,name=broker,proto3" json:"broker,omitempty"` - Topics []string `protobuf:"bytes,3,rep,name=topics,proto3" json:"topics,omitempty"` - Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` -} - -func (x *Message) Reset() { - *x = Message{} - if protoimpl.UnsafeEnabled { - mi := &file_message_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Message) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Message) ProtoMessage() {} - -func (x *Message) ProtoReflect() protoreflect.Message { - mi := &file_message_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Message.ProtoReflect.Descriptor instead. -func (*Message) Descriptor() ([]byte, []int) { - return file_message_proto_rawDescGZIP(), []int{0} -} - -func (x *Message) GetCommand() string { - if x != nil { - return x.Command - } - return "" -} - -func (x *Message) GetBroker() string { - if x != nil { - return x.Broker - } - return "" -} - -func (x *Message) GetTopics() []string { - if x != nil { - return x.Topics - } - return nil -} - -func (x *Message) GetPayload() []byte { - if x != nil { - return x.Payload - } - return nil -} - -type Messages struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Messages []*Message `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` -} - -func (x *Messages) Reset() { - *x = Messages{} - if protoimpl.UnsafeEnabled { - mi := &file_message_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Messages) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Messages) ProtoMessage() {} - -func (x *Messages) ProtoReflect() protoreflect.Message { - mi := &file_message_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Messages.ProtoReflect.Descriptor instead. -func (*Messages) Descriptor() ([]byte, []int) { - return file_message_proto_rawDescGZIP(), []int{1} -} - -func (x *Messages) GetMessages() []*Message { - if x != nil { - return x.Messages - } - return nil -} - -var File_message_proto protoreflect.FileDescriptor - -var file_message_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x09, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x76, 0x31, 0x22, 0x6d, 0x0a, 0x07, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, - 0x16, 0x0a, 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x73, 0x12, - 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x3a, 0x0a, 0x08, 0x4d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x12, 0x2e, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, - 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, 0x08, 0x6d, 0x65, 0x73, - 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_message_proto_rawDescOnce sync.Once - file_message_proto_rawDescData = file_message_proto_rawDesc -) - -func file_message_proto_rawDescGZIP() []byte { - file_message_proto_rawDescOnce.Do(func() { - file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData) - }) - return file_message_proto_rawDescData -} - -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_message_proto_goTypes = []interface{}{ - (*Message)(nil), // 0: pubsub.v1.Message - (*Messages)(nil), // 1: pubsub.v1.Messages -} -var file_message_proto_depIdxs = []int32{ - 0, // 0: pubsub.v1.Messages.messages:type_name -> pubsub.v1.Message - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_message_proto_init() } -func file_message_proto_init() { - if File_message_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Message); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Messages); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_message_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_message_proto_goTypes, - DependencyIndexes: file_message_proto_depIdxs, - MessageInfos: file_message_proto_msgTypes, - }.Build() - File_message_proto = out.File - file_message_proto_rawDesc = nil - file_message_proto_goTypes = nil - file_message_proto_depIdxs = nil -} diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index af107dff..ba873513 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -10,9 +10,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" @@ -214,7 +214,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { } // Set puts the K/V to the bolt -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -324,7 +324,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("boltdb_driver_mexpire") for i := range items { if items[i].Timeout == "" || strings.TrimSpace(items[i].Key) == "" { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index 9c4689c4..8ea515d0 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,9 +6,9 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -135,7 +135,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -176,7 +176,7 @@ func (d *Driver) Set(items ...*payload.Item) error { // MExpire Expiration is the cache expiration time, in seconds: either a relative // time from now (up to 1 month), or an absolute Unix epoch time. // Zero means the Item has no expiration time. -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("memcached_plugin_mexpire") for i := range items { if items[i] == nil { diff --git a/plugins/kv/drivers/memory/driver.go b/plugins/kv/drivers/memory/driver.go index fae2c831..3158adee 100644 --- a/plugins/kv/drivers/memory/driver.go +++ b/plugins/kv/drivers/memory/driver.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" ) @@ -72,7 +72,7 @@ func (s *Driver) Get(key string) ([]byte, error) { if data, exist := s.heap.Load(key); exist { // here might be a panic // but data only could be a string, see Set function - return utils.AsBytes(data.(*payload.Item).Value), nil + return utils.AsBytes(data.(*kvv1.Item).Value), nil } return nil, nil } @@ -95,14 +95,14 @@ func (s *Driver) MGet(keys ...string) (map[string]interface{}, error) { for i := range keys { if value, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = value.(*payload.Item).Value + m[keys[i]] = value.(*kvv1.Item).Value } } return m, nil } -func (s *Driver) Set(items ...*payload.Item) error { +func (s *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -128,7 +128,7 @@ func (s *Driver) Set(items ...*payload.Item) error { // MExpire sets the expiration time to the key // If key already has the expiration time, it will be overwritten -func (s *Driver) MExpire(items ...*payload.Item) error { +func (s *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("in_memory_plugin_mexpire") for i := range items { if items[i] == nil { @@ -145,11 +145,11 @@ func (s *Driver) MExpire(items ...*payload.Item) error { if err != nil { return errors.E(op, err) } - tmp := pItem.(*payload.Item) + tmp := pItem.(*kvv1.Item) // guess that t is in the future // in memory is just FOR TESTING PURPOSES // LOGIC ISN'T IDEAL - s.heap.Store(items[i].Key, &payload.Item{ + s.heap.Store(items[i].Key, &kvv1.Item{ Key: items[i].Key, Value: tmp.Value, Timeout: items[i].Timeout, @@ -178,7 +178,7 @@ func (s *Driver) TTL(keys ...string) (map[string]interface{}, error) { for i := range keys { if item, ok := s.heap.Load(keys[i]); ok { - m[keys[i]] = item.(*payload.Item).Timeout + m[keys[i]] = item.(*kvv1.Item).Timeout } } return m, nil @@ -216,7 +216,7 @@ func (s *Driver) gc() { case now := <-ticker.C: // check every second s.heap.Range(func(key, value interface{}) bool { - v := value.(*payload.Item) + v := value.(*kvv1.Item) if v.Timeout == "" { return true } diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/kv/drivers/redis/driver.go index 5890367b..0aaa6352 100644 --- a/plugins/kv/drivers/redis/driver.go +++ b/plugins/kv/drivers/redis/driver.go @@ -7,9 +7,9 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -138,7 +138,7 @@ func (d *Driver) MGet(keys ...string) (map[string]interface{}, error) { // // Use expiration for `SETEX`-like behavior. // Zero expiration means the key has no expiration time. -func (d *Driver) Set(items ...*payload.Item) error { +func (d *Driver) Set(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_set") if items == nil { return errors.E(op, errors.NoKeys) @@ -187,7 +187,7 @@ func (d *Driver) Delete(keys ...string) error { // MExpire https://redis.io/commands/expire // timeout in RFC3339 -func (d *Driver) MExpire(items ...*payload.Item) error { +func (d *Driver) MExpire(items ...*kvv1.Item) error { const op = errors.Op("redis_driver_mexpire") now := time.Now() for _, item := range items { diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index abcd7f47..7841f9a2 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,6 +1,6 @@ package kv -import "github.com/spiral/roadrunner/v2/plugins/kv/payload" +import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -16,10 +16,10 @@ type Storage interface { // Set used to upload item to KV with TTL // 0 value in TTL means no TTL - Set(items ...*payload.Item) error + Set(items ...*kvv1.Item) error // MExpire sets the TTL for multiply keys - MExpire(items ...*payload.Item) error + MExpire(items ...*kvv1.Item) error // TTL return the rest time to live for provided keys // Not supported for the memcached and boltdb diff --git a/plugins/kv/payload.proto b/plugins/kv/payload.proto deleted file mode 100644 index 66a22ee4..00000000 --- a/plugins/kv/payload.proto +++ /dev/null @@ -1,17 +0,0 @@ -syntax = "proto3"; - -package kv.v1; -option go_package = "./payload"; - -message Payload { - // could be an enum in the future - string storage = 1; - repeated Item items = 2; -} - -message Item { - string key = 1; - string value = 2; - // RFC 3339 - string timeout = 3; -} diff --git a/plugins/kv/payload/payload.pb.go b/plugins/kv/payload/payload.pb.go deleted file mode 100644 index 60ad9f13..00000000 --- a/plugins/kv/payload/payload.pb.go +++ /dev/null @@ -1,234 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.16.0 -// source: payload.proto - -package payload - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Payload struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - // could be an enum in the future - Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"` - Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"` -} - -func (x *Payload) Reset() { - *x = Payload{} - if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Payload) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Payload) ProtoMessage() {} - -func (x *Payload) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Payload.ProtoReflect.Descriptor instead. -func (*Payload) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{0} -} - -func (x *Payload) GetStorage() string { - if x != nil { - return x.Storage - } - return "" -} - -func (x *Payload) GetItems() []*Item { - if x != nil { - return x.Items - } - return nil -} - -type Item struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"` -} - -func (x *Item) Reset() { - *x = Item{} - if protoimpl.UnsafeEnabled { - mi := &file_payload_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Item) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Item) ProtoMessage() {} - -func (x *Item) ProtoReflect() protoreflect.Message { - mi := &file_payload_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Item.ProtoReflect.Descriptor instead. -func (*Item) Descriptor() ([]byte, []int) { - return file_payload_proto_rawDescGZIP(), []int{1} -} - -func (x *Item) GetKey() string { - if x != nil { - return x.Key - } - return "" -} - -func (x *Item) GetValue() string { - if x != nil { - return x.Value - } - return "" -} - -func (x *Item) GetTimeout() string { - if x != nil { - return x.Timeout - } - return "" -} - -var File_payload_proto protoreflect.FileDescriptor - -var file_payload_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x02, 0x6b, 0x76, 0x22, 0x43, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x1e, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x6b, 0x76, 0x2e, 0x49, 0x74, 0x65, - 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48, 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, - 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, - 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x42, 0x0b, 0x5a, 0x09, 0x2e, 0x2f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_payload_proto_rawDescOnce sync.Once - file_payload_proto_rawDescData = file_payload_proto_rawDesc -) - -func file_payload_proto_rawDescGZIP() []byte { - file_payload_proto_rawDescOnce.Do(func() { - file_payload_proto_rawDescData = protoimpl.X.CompressGZIP(file_payload_proto_rawDescData) - }) - return file_payload_proto_rawDescData -} - -var file_payload_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_payload_proto_goTypes = []interface{}{ - (*Payload)(nil), // 0: kv.Payload - (*Item)(nil), // 1: kv.Item -} -var file_payload_proto_depIdxs = []int32{ - 1, // 0: kv.Payload.items:type_name -> kv.Item - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_payload_proto_init() } -func file_payload_proto_init() { - if File_payload_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_payload_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Payload); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_payload_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Item); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_payload_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_payload_proto_goTypes, - DependencyIndexes: file_payload_proto_depIdxs, - MessageInfos: file_payload_proto_msgTypes, - }.Build() - File_payload_proto = out.File - file_payload_proto_rawDesc = nil - file_payload_proto_goTypes = nil - file_payload_proto_depIdxs = nil -} diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index 009a5c56..a9efe0c4 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,7 +2,7 @@ package kv import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" + kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -17,7 +17,7 @@ type rpc struct { } // Has accept []*payload.Payload proto payload with Storage and Item -func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { +func (r *rpc) Has(in *kvv1.Payload, res *map[string]bool) error { const op = errors.Op("rpc_has") if in.Storage == "" { @@ -46,7 +46,7 @@ func (r *rpc) Has(in *payload.Payload, res *map[string]bool) error { } // Set accept proto payload with Storage and Item -func (r *rpc) Set(in *payload.Payload, ok *bool) error { +func (r *rpc) Set(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_set") if st, exists := r.storages[in.GetStorage()]; exists { @@ -65,7 +65,7 @@ func (r *rpc) Set(in *payload.Payload, ok *bool) error { } // MGet accept proto payload with Storage and Item -func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) MGet(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_mget") keys := make([]string, 0, len(in.GetItems())) @@ -89,7 +89,7 @@ func (r *rpc) MGet(in *payload.Payload, res *map[string]interface{}) error { } // MExpire accept proto payload with Storage and Item -func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { +func (r *rpc) MExpire(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rpc_mexpire") if st, exists := r.storages[in.GetStorage()]; exists { @@ -108,7 +108,7 @@ func (r *rpc) MExpire(in *payload.Payload, ok *bool) error { } // TTL accept proto payload with Storage and Item -func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { +func (r *rpc) TTL(in *kvv1.Payload, res *map[string]interface{}) error { const op = errors.Op("rpc_ttl") keys := make([]string, 0, len(in.GetItems())) @@ -131,7 +131,7 @@ func (r *rpc) TTL(in *payload.Payload, res *map[string]interface{}) error { } // Delete accept proto payload with Storage and Item -func (r *rpc) Delete(in *payload.Payload, ok *bool) error { +func (r *rpc) Delete(in *kvv1.Payload, ok *bool) error { const op = errors.Op("rcp_delete") keys := make([]string, 0, len(in.GetItems())) diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 76bef400..ac9ebcc2 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,7 +4,7 @@ import ( "context" "sync" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -23,13 +23,13 @@ type FanIn struct { log logger.Logger // out channel with all subs - out chan *message.Message + out chan *websocketsv1.Message exit chan struct{} } func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { - out := make(chan *message.Message, 100) + out := make(chan *websocketsv1.Message, 100) fi := &FanIn{ out: out, client: redisClient, @@ -67,7 +67,7 @@ func (fi *FanIn) read() { return } - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(utils.AsBytes(msg.Payload), m) if err != nil { fi.log.Error("message unmarshal") @@ -97,6 +97,6 @@ func (fi *FanIn) stop() error { return nil } -func (fi *FanIn) consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *websocketsv1.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 695e7b08..47ffeb39 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" @@ -107,7 +107,7 @@ func (p *Plugin) Publish(msg []byte) error { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return errors.E(err) @@ -126,7 +126,7 @@ func (p *Plugin) PublishAsync(msg []byte) { go func() { p.Lock() defer p.Unlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { p.log.Error("message unmarshal error") @@ -209,6 +209,6 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } // Next return next message -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { return <-p.fanin.consume(), nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 951c9a1a..e3d47166 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -8,8 +8,8 @@ import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -64,7 +64,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit return errors.E(op, err) } - msg := &message.Message{} + msg := &websocketsv1.Message{} err = json.Unmarshal(data, msg) if err != nil { diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go index deb927ed..cef28182 100644 --- a/plugins/websockets/memory/inMemory.go +++ b/plugins/websockets/memory/inMemory.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) { } } -func (p *Plugin) Next() (*message.Message, error) { +func (p *Plugin) Next() (*websocketsv1.Message, error) { msg := <-p.pushCh if msg == nil { return nil, nil @@ -76,7 +76,7 @@ func (p *Plugin) Next() (*message.Message, error) { p.RLock() defer p.RUnlock() - m := &message.Message{} + m := &websocketsv1.Message{} err := proto.Unmarshal(msg, m) if err != nil { return nil, err diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index cf21fffa..6ddd609c 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -80,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server + // attach default driver + p.pubsubs["memory"] = memory.NewInMemory(p.log) + return nil } @@ -91,11 +94,6 @@ func (p *Plugin) Serve() chan error { p.Lock() defer p.Unlock() - // attach default driver - if len(p.pubsubs) == 0 { - p.pubsubs["memory"] = memory.NewInMemory(p.log) - } - p.phpPool, err = p.server.NewWorkerPool(context.Background(), phpPool.Config{ Debug: p.cfg.Pool.Debug, NumWorkers: p.cfg.Pool.NumWorkers, @@ -307,7 +305,7 @@ func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { return err @@ -331,7 +329,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &message.Message{} + msg := &websocketsv1.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index efafb2d3..1a7c6f8a 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,8 +4,8 @@ import ( "sync" "github.com/fasthttp/websocket" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" ) @@ -16,7 +16,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *message.Message + queue chan *websocketsv1.Message exit chan struct{} } @@ -24,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *message.Message, 100), + queue: make(chan *websocketsv1.Message, 100), storage: pubsubs, log: log, exit: make(chan struct{}), @@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *message.Message) { +func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { wp.queue <- msg } diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index ef44884a..00c1dd91 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -2,7 +2,7 @@ package websockets import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) @@ -15,7 +15,7 @@ type rpc struct { // Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(in *message.Messages, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") // just return in case of nil message @@ -47,7 +47,7 @@ func (r *rpc) Publish(in *message.Messages, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(in *message.Messages, ok *bool) error { +func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("publish_async") // just return in case of nil message diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index 1bcb3455..760b6951 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -12,13 +12,13 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + payload "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis" - "github.com/spiral/roadrunner/v2/plugins/kv/payload" "github.com/spiral/roadrunner/v2/plugins/logger" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/stretchr/testify/assert" @@ -107,7 +107,6 @@ func kvSetTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ Storage: "boltdb-south", Items: []*payload.Item{ @@ -130,7 +129,6 @@ func kvHasTest(t *testing.T) { assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) // WorkerList contains list of workers. - p := &payload.Payload{ Storage: "boltdb-south", Items: []*payload.Item{ diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index 593085b7..4e4c09f1 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -16,7 +16,7 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -216,9 +216,9 @@ func TestWSRedisAndMemory(t *testing.T) { }() time.Sleep(time.Second * 1) + t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) t.Run("RPCWsMemory", RPCWsMemory) t.Run("RPCWsRedis", RPCWsRedis) - t.Run("RPCWsMemoryPubAsync", RPCWsMemoryPubAsync) stopCh <- struct{}{} @@ -870,8 +870,8 @@ func publish2(command string, broker string, topics ...string) { panic(err) } } -func messageWS(command string, broker string, payload []byte, topics ...string) *message.Message { - return &message.Message{ +func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { + return &websocketsv1.Message{ Topics: topics, Command: command, Broker: broker, @@ -879,9 +879,9 @@ func messageWS(command string, broker string, payload []byte, topics ...string) } } -func makeMessage(command string, broker string, payload []byte, topics ...string) *message.Messages { - m := &message.Messages{ - Messages: []*message.Message{ +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Messages { + m := &websocketsv1.Messages{ + Messages: []*websocketsv1.Message{ { Topics: topics, Command: command, -- cgit v1.2.3