diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-01 14:57:33 +0300 |
commit | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (patch) | |
tree | d940de0ee304d3edb60daa35568c3f186dc6a8b5 /pkg/pubsub | |
parent | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (diff) |
- Initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pubsub')
-rw-r--r-- | pkg/pubsub/interface.go | 4 | ||||
-rw-r--r-- | pkg/pubsub/message.fbs | 14 | ||||
-rw-r--r-- | pkg/pubsub/message/Message.go | 118 | ||||
-rw-r--r-- | pkg/pubsub/message/Messages.go | 67 |
4 files changed, 201 insertions, 2 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index caf8783f..63a12b34 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -19,11 +19,11 @@ type Subscriber interface { // Publisher publish one or more messages type Publisher interface { // Publish one or multiple Channel. - Publish(messages []*Message) error + Publish(messages []byte) error // PublishAsync publish message and return immediately // If error occurred it will be printed into the logger - PublishAsync(messages []*Message) + PublishAsync(messages []byte) } // Reader interface should return next message diff --git a/pkg/pubsub/message.fbs b/pkg/pubsub/message.fbs new file mode 100644 index 00000000..7e975894 --- /dev/null +++ b/pkg/pubsub/message.fbs @@ -0,0 +1,14 @@ +namespace message; + +table Message { + command:string; + broker:string; + topics:[string]; + payload:[byte]; +} + +table Messages { + messages:[Message]; +} + +root_type Messages; diff --git a/pkg/pubsub/message/Message.go b/pkg/pubsub/message/Message.go new file mode 100644 index 00000000..26bbd12c --- /dev/null +++ b/pkg/pubsub/message/Message.go @@ -0,0 +1,118 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Message struct { + _tab flatbuffers.Table +} + +func GetRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Message{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessage(buf []byte, offset flatbuffers.UOffsetT) *Message { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Message{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Message) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Message) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Message) Command() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Broker() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *Message) Topics(j int) []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4)) + } + return nil +} + +func (rcv *Message) TopicsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) Payload(j int) int8 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetInt8(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *Message) PayloadLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *Message) MutatePayload(j int, n int8) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateInt8(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func MessageStart(builder *flatbuffers.Builder) { + builder.StartObject(4) +} +func MessageAddCommand(builder *flatbuffers.Builder, command flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(command), 0) +} +func MessageAddBroker(builder *flatbuffers.Builder, broker flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(broker), 0) +} +func MessageAddTopics(builder *flatbuffers.Builder, topics flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(topics), 0) +} +func MessageStartTopicsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessageAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(payload), 0) +} +func MessageStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func MessageEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/pubsub/message/Messages.go b/pkg/pubsub/message/Messages.go new file mode 100644 index 00000000..633b367d --- /dev/null +++ b/pkg/pubsub/message/Messages.go @@ -0,0 +1,67 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package message + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type Messages struct { + _tab flatbuffers.Table +} + +func GetRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &Messages{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsMessages(buf []byte, offset flatbuffers.UOffsetT) *Messages { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &Messages{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *Messages) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *Messages) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *Messages) Messages(obj *Message, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *Messages) MessagesLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func MessagesStart(builder *flatbuffers.Builder) { + builder.StartObject(1) +} +func MessagesAddMessages(builder *flatbuffers.Builder, messages flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(messages), 0) +} +func MessagesStartMessagesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func MessagesEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} |