summaryrefslogtreecommitdiff
path: root/plugins/websockets/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/rpc.go')
-rw-r--r--plugins/websockets/rpc.go94
1 files changed, 84 insertions, 10 deletions
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 2fb0f1b9..6c2cacb4 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,8 +1,9 @@
package websockets
import (
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
@@ -12,9 +13,11 @@ type rpc struct {
log logger.Logger
}
-func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+// Publish ... msg is a flatbuffers decoded payload
+// see: pkg/pubsub/message.fbs
+func (r *rpc) Publish(msg []byte, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published", "msg", msg)
+ r.log.Debug("message published")
// just return in case of nil message
if msg == nil {
@@ -22,16 +25,36 @@ func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
return nil
}
- err := r.plugin.Publish(msg)
- if err != nil {
- *ok = false
- return errors.E(op, err)
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ err := r.plugin.Publish(bb)
+ if err != nil {
+ *ok = false
+ b.Reset()
+ return errors.E(op, err)
+ }
+ b.Reset()
}
+
*ok = true
return nil
}
-func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
+func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
r.log.Debug("message published", "msg", msg)
// just return in case of nil message
@@ -39,9 +62,60 @@ func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
*ok = true
return nil
}
- // publish to the registered broker
- r.plugin.PublishAsync(msg)
+
+ fbsMsg := message.GetRootAsMessages(msg, 0)
+ tmpMsg := &message.Message{}
+
+ b := flatbuffers.NewBuilder(100)
+
+ for i := 0; i < fbsMsg.MessagesLength(); i++ {
+ // init a message
+ fbsMsg.Messages(tmpMsg, i)
+
+ // overhead HERE
+ orig := serializeMsg(b, tmpMsg)
+ bb := make([]byte, len(orig))
+ copy(bb, orig)
+
+ r.plugin.PublishAsync(bb)
+ b.Reset()
+ }
*ok = true
return nil
}
+
+func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
+ cmdOff := b.CreateByteString(msg.Command())
+ brokerOff := b.CreateByteString(msg.Broker())
+
+ offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
+ for j := msg.TopicsLength() - 1; j >= 0; j-- {
+ offsets[j] = b.CreateByteString(msg.Topics(j))
+ }
+
+ message.MessageStartTopicsVector(b, len(offsets))
+
+ for j := len(offsets) - 1; j >= 0; j-- {
+ b.PrependUOffsetT(offsets[j])
+ }
+
+ tOff := b.EndVector(len(offsets))
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ pOff := b.CreateByteVector(bb)
+
+ message.MessageStart(b)
+
+ message.MessageAddCommand(b, cmdOff)
+ message.MessageAddBroker(b, brokerOff)
+ message.MessageAddTopics(b, tOff)
+ message.MessageAddPayload(b, pOff)
+
+ fOff := message.MessageEnd(b)
+ b.Finish(fOff)
+
+ return b.FinishedBytes()
+}