summaryrefslogtreecommitdiff
path: root/pkg/pubsub
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
committerValery Piashchynski <[email protected]>2021-06-01 14:57:33 +0300
commit352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch)
treed940de0ee304d3edb60daa35568c3f186dc6a8b5 /pkg/pubsub
parent548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff)
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pubsub')
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--pkg/pubsub/message.fbs14
-rw-r--r--pkg/pubsub/message/Message.go118
-rw-r--r--pkg/pubsub/message/Messages.go67
4 files changed, 201 insertions, 2 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index caf8783f..63a12b34 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -19,11 +19,11 @@ type Subscriber interface {
// Publisher publish one or more messages
type Publisher interface {
// Publish one or multiple Channel.
- Publish(messages []*Message) error
+ Publish(messages []byte) error
// PublishAsync publish message and return immediately
// If error occurred it will be printed into the logger
- PublishAsync(messages []*Message)
+ PublishAsync(messages []byte)
}
// Reader interface should return next message
diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs
new file mode 100644
index 00000000..7e975894
--- /dev/null
+++ b/pkg/pubsub/message.fbs
@@ -0,0 +1,14 @@
+namespace message;
+
+table Message {
+ command:string;
+ broker:string;
+ topics:[string];
+ payload:[byte];
+}
+
+table Messages {
+ messages:[Message];
+}
+
+root_type Messages;
diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go
new file mode 100644
index 00000000..26bbd12c
--- /dev/null
+++ b/pkg/pubsub/message/Message.go
@@ -0,0 +1,118 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Message struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Message{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Message{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Message) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Message) Command() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Broker() []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
+ if o != 0 {
+ return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ }
+ return nil
+}
+
+func (rcv *Message) Topics(j int) []byte {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
+ }
+ return nil
+}
+
+func (rcv *Message) TopicsLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) Payload(j int) int8 {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1))
+ }
+ return 0
+}
+
+func (rcv *Message) PayloadLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func (rcv *Message) MutatePayload(j int, n int8) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
+ if o != 0 {
+ a := rcv._tab.Vector(o)
+ return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n)
+ }
+ return false
+}
+
+func MessageStart(builder *flatbuffers.Builder) {
+ builder.StartObject(4)
+}
+func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0)
+}
+func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0)
+}
+func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0)
+}
+func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0)
+}
+func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(1, numElems, 1)
+}
+func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}
diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go
new file mode 100644
index 00000000..633b367d
--- /dev/null
+++ b/pkg/pubsub/message/Messages.go
@@ -0,0 +1,67 @@
+// Code generated by the FlatBuffers compiler. DO NOT EDIT.
+
+package message
+
+import (
+ flatbuffers "github.com/google/flatbuffers/go"
+)
+
+type Messages struct {
+ _tab flatbuffers.Table
+}
+
+func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset:])
+ x := &Messages{}
+ x.Init(buf, n+offset)
+ return x
+}
+
+func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages {
+ n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
+ x := &Messages{}
+ x.Init(buf, n+offset+flatbuffers.SizeUint32)
+ return x
+}
+
+func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) {
+ rcv._tab.Bytes = buf
+ rcv._tab.Pos = i
+}
+
+func (rcv *Messages) Table() flatbuffers.Table {
+ return rcv._tab
+}
+
+func (rcv *Messages) Messages(obj *Message, j int) bool {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ x := rcv._tab.Vector(o)
+ x += flatbuffers.UOffsetT(j) * 4
+ x = rcv._tab.Indirect(x)
+ obj.Init(rcv._tab.Bytes, x)
+ return true
+ }
+ return false
+}
+
+func (rcv *Messages) MessagesLength() int {
+ o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
+ if o != 0 {
+ return rcv._tab.VectorLen(o)
+ }
+ return 0
+}
+
+func MessagesStart(builder *flatbuffers.Builder) {
+ builder.StartObject(1)
+}
+func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) {
+ builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0)
+}
+func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
+ return builder.StartVector(4, numElems, 4)
+}
+func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
+ return builder.EndObject()
+}