diff options
Diffstat (limited to 'plugins/websockets/rpc.go')
-rw-r--r-- | plugins/websockets/rpc.go | 94 |
1 files changed, 84 insertions, 10 deletions
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 2fb0f1b9..6c2cacb4 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,8 +1,9 @@ package websockets import ( + flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" ) @@ -12,9 +13,11 @@ type rpc struct { log logger.Logger } -func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { +// Publish ... msg is a flatbuffers decoded payload +// see: pkg/pubsub/message.fbs +func (r *rpc) Publish(msg []byte, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published", "msg", msg) + r.log.Debug("message published") // just return in case of nil message if msg == nil { @@ -22,16 +25,36 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error { return nil } - err := r.plugin.Publish(msg) - if err != nil { - *ok = false - return errors.E(op, err) + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + err := r.plugin.Publish(bb) + if err != nil { + *ok = false + b.Reset() + return errors.E(op, err) + } + b.Reset() } + *ok = true return nil } -func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { +// PublishAsync ... +// see: pkg/pubsub/message.fbs +func (r *rpc) PublishAsync(msg []byte, ok *bool) error { r.log.Debug("message published", "msg", msg) // just return in case of nil message @@ -39,9 +62,60 @@ func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error { *ok = true return nil } - // publish to the registered broker - r.plugin.PublishAsync(msg) + + fbsMsg := message.GetRootAsMessages(msg, 0) + tmpMsg := &message.Message{} + + b := flatbuffers.NewBuilder(100) + + for i := 0; i < fbsMsg.MessagesLength(); i++ { + // init a message + fbsMsg.Messages(tmpMsg, i) + + // overhead HERE + orig := serializeMsg(b, tmpMsg) + bb := make([]byte, len(orig)) + copy(bb, orig) + + r.plugin.PublishAsync(bb) + b.Reset() + } *ok = true return nil } + +func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { + cmdOff := b.CreateByteString(msg.Command()) + brokerOff := b.CreateByteString(msg.Broker()) + + offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) + for j := msg.TopicsLength() - 1; j >= 0; j-- { + offsets[j] = b.CreateByteString(msg.Topics(j)) + } + + message.MessageStartTopicsVector(b, len(offsets)) + + for j := len(offsets) - 1; j >= 0; j-- { + b.PrependUOffsetT(offsets[j]) + } + + tOff := b.EndVector(len(offsets)) + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + pOff := b.CreateByteVector(bb) + + message.MessageStart(b) + + message.MessageAddCommand(b, cmdOff) + message.MessageAddBroker(b, brokerOff) + message.MessageAddTopics(b, tOff) + message.MessageAddPayload(b, pOff) + + fOff := message.MessageEnd(b) + b.Finish(fOff) + + return b.FinishedBytes() +} |