diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 19:16:36 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-02 19:16:36 +0300 |
commit | a99c14abb333c10a9142cd2f178e001f1b1726fb (patch) | |
tree | ec46ffb3db177f9aacef75d9c7bdcd6d894bf20c /pkg | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) | |
parent | 27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff) |
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pubsub/interface.go | 8 | ||||
-rw-r--r-- | pkg/pubsub/message.fbs | 14 | ||||
-rw-r--r-- | pkg/pubsub/message.go | 9 | ||||
-rw-r--r-- | pkg/pubsub/message/Message.go | 118 | ||||
-rw-r--r-- | pkg/pubsub/message/Messages.go | 67 |
5 files changed, 204 insertions, 12 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index caf8783f..18c1a80c 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -1,5 +1,7 @@ package pubsub +import "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + // PubSub ... type PubSub interface { Publisher @@ -19,14 +21,14 @@ type Subscriber interface { // Publisher publish one or more messages type Publisher interface { // Publish one or multiple Channel. - Publish(messages []*Message) error + Publish(messages []byte) error // PublishAsync publish message and return immediately // If error occurred it will be printed into the logger - PublishAsync(messages []*Message) + PublishAsync(messages []byte) } // Reader interface should return next message type Reader interface { - Next() (*Message, error) + Next() (*message.Message, error) } diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs new file mode 100644 index 00000000..7e975894 --- /dev/null +++ b/pkg/pubsub/message.fbs @@ -0,0 +1,14 @@ +namespace message; + +table Message { + command:string; + broker:string; + topics:[string]; + payload:[byte]; +} + +table Messages { + messages:[Message]; +} + +root_type Messages; diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go index c17d153b..74348722 100644 --- a/pkg/pubsub/message.go +++ b/pkg/pubsub/message.go @@ -1,9 +1,5 @@ package pubsub -import ( - json "github.com/json-iterator/go" -) - type Message struct { // Command (join, leave, headers) Command string `json:"command"` @@ -17,8 +13,3 @@ type Message struct { // Payload to be broadcasted Payload []byte `json:"payload"` } - -// MarshalBinary needed to marshal message for the redis -func (m *Message) MarshalBinary() ([]byte, error) { - return json.Marshal(m) -} diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go new file mode 100644 index 00000000..26bbd12c --- /dev/null +++ b/pkg/pubsub/message/Message.go @@ -0,0 +1,118 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Message struct { + _tab flatbuffers.Table +} + +func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Message{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Message{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Message) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Message) Command() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Broker() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Topics(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Message) TopicsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) Payload(j int) int8 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Message) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) MutatePayload(j int, n int8) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func MessageStart(builder *flatbuffers.Builder) { + builder.StartObject(4) +} +func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0) +} +func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0) +} +func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0) +} +func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) +} +func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go new file mode 100644 index 00000000..633b367d --- /dev/null +++ b/pkg/pubsub/message/Messages.go @@ -0,0 +1,67 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Messages struct { + _tab flatbuffers.Table +} + +func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Messages{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Messages{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Messages) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Messages) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Messages) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessagesStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0) +} +func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} |