summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 19:16:36 +0300
committerGitHub <[email protected]>2021-06-02 19:16:36 +0300
commita99c14abb333c10a9142cd2f178e001f1b1726fb (patch)
treeec46ffb3db177f9aacef75d9c7bdcd6d894bf20c /pkg
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
parent27295b35e4f2702bf73d8ab10d10b84e527daf2b (diff)
#698 feat(ws): replace `json` with binary flatbuffers
#698 feat(ws): replace `json` with binary flatbuffers
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pubsub/interface.go8
-rw-r--r--pkg/pubsub/message.fbs14
-rw-r--r--pkg/pubsub/message.go9
-rw-r--r--pkg/pubsub/message/Message.go118
-rw-r--r--pkg/pubsub/message/Messages.go67
5 files changed, 204 insertions, 12 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index caf8783f..18c1a80c 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -1,5 +1,7 @@
package pubsub
+import "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+
// PubSub ...
type PubSub interface {
Publisher
@@ -19,14 +21,14 @@ type Subscriber interface {
// Publisher publish one or more messages
type Publisher interface {
// Publish one or multiple Channel.
- Publish(messages []*Message) error
+ Publish(messages []byte) error
// PublishAsync publish message and return immediately
// If error occurred it will be printed into the logger
- PublishAsync(messages []*Message)
+ PublishAsync(messages []byte)
}
// Reader interface should return next message
type Reader interface {
- Next() (*Message, error)
+ Next() (*message.Message, error)
}
diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs
new file mode 100644
index 00000000..7e975894
--- /dev/null
+++ b/pkg/pubsub/message.fbs
@@ -0,0 +1,14 @@
+namespace message;
+
+table Message {
+ command:string;
+ broker:string;
+ topics:[string];
+ payload:[byte];
+}
+
+table Messages {
+ messages:[Message];
+}
+
+root_type Messages;
diff --git a/pkg/pubsub/message.go b/pkg/pubsub/message.go
index c17d153b..74348722 100644
--- a/pkg/pubsub/message.go
+++ b/pkg/pubsub/message.go
@@ -1,9 +1,5 @@
package pubsub
-import (
- json "github.com/json-iterator/go"
-)
-
type Message struct {
// Command (join, leave, headers)
Command string `json:"command"`
@@ -17,8 +13,3 @@ type Message struct {
// Payload to be broadcasted
Payload []byte `json:"payload"`
}
-
-// MarshalBinary needed to marshal message for the redis
-func (m *Message) MarshalBinary() ([]byte, error) {
- return json.Marshal(m)
-}
diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go
new file mode 100644
index 00000000..26bbd12c
--- /dev/null
+++ b/pkg/pubsub/message/Message.go
@@ -0,0 +1,118 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Message struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Message{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Message{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Message) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Message) Command() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Broker() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Topics(j int) []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
+ }
+ return nil
+}
+
+func (rcv *Message) TopicsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) Payload(j int) int8 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1))
+ }
+ return 0
+}
+
+func (rcv *Message) PayloadLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) MutatePayload(j int, n int8) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n)
+ }
+ return false
+}
+
+func MessageStart(builder *flatbuffers.Builder) {
+ builder.StartObject(4)
+}
+func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0)
+}
+func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0)
+}
+func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0)
+}
+func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
+}
+func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(1, numElems, 1)
+}
+func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go
new file mode 100644
index 00000000..633b367d
--- /dev/null
+++ b/pkg/pubsub/message/Messages.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Messages struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Messages{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Messages{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Messages) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Messages) Messages(obj *Message, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Messages) MessagesLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func MessagesStart(builder *flatbuffers.Builder) {
+ builder.StartObject(1)
+}
+func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0)
+}
+func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}