summaryrefslogtreecommitdiff
path: root/plugins/websockets/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/rpc.go')
-rw-r--r--plugins/websockets/rpc.go93
1 files changed, 23 insertions, 70 deletions
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
index 6c2cacb4..00c1dd91 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/websockets/rpc.go
@@ -1,10 +1,10 @@
package websockets
import (
- flatbuffers "github.com/google/flatbuffers/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -13,39 +13,32 @@ type rpc struct {
log logger.Logger
}
-// Publish ... msg is a flatbuffers decoded payload
+// Publish ... msg is a proto decoded payload
// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(msg []byte, ok *bool) error {
+func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error {
const op = errors.Op("broadcast_publish")
- r.log.Debug("message published")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
+ r.log.Debug("message published", "msg", in.Messages)
- b := flatbuffers.NewBuilder(100)
+ msgLen := len(in.GetMessages())
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
-
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
- err := r.plugin.Publish(bb)
+ err = r.plugin.Publish(bb)
if err != nil {
*ok = false
- b.Reset()
return errors.E(op, err)
}
- b.Reset()
}
*ok = true
@@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error {
// PublishAsync ...
// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(msg []byte, ok *bool) error {
- r.log.Debug("message published", "msg", msg)
+func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error {
+ const op = errors.Op("publish_async")
// just return in case of nil message
- if msg == nil {
+ if in == nil {
*ok = true
return nil
}
- fbsMsg := message.GetRootAsMessages(msg, 0)
- tmpMsg := &message.Message{}
-
- b := flatbuffers.NewBuilder(100)
+ r.log.Debug("message published", "msg", in.Messages)
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- // init a message
- fbsMsg.Messages(tmpMsg, i)
+ msgLen := len(in.GetMessages())
- // overhead HERE
- orig := serializeMsg(b, tmpMsg)
- bb := make([]byte, len(orig))
- copy(bb, orig)
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
r.plugin.PublishAsync(bb)
- b.Reset()
}
*ok = true
return nil
}
-
-func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte {
- cmdOff := b.CreateByteString(msg.Command())
- brokerOff := b.CreateByteString(msg.Broker())
-
- offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength())
- for j := msg.TopicsLength() - 1; j >= 0; j-- {
- offsets[j] = b.CreateByteString(msg.Topics(j))
- }
-
- message.MessageStartTopicsVector(b, len(offsets))
-
- for j := len(offsets) - 1; j >= 0; j-- {
- b.PrependUOffsetT(offsets[j])
- }
-
- tOff := b.EndVector(len(offsets))
- bb := make([]byte, msg.PayloadLength())
- for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
- }
- pOff := b.CreateByteVector(bb)
-
- message.MessageStart(b)
-
- message.MessageAddCommand(b, cmdOff)
- message.MessageAddBroker(b, brokerOff)
- message.MessageAddTopics(b, tOff)
- message.MessageAddPayload(b, pOff)
-
- fOff := message.MessageEnd(b)
- b.Finish(fOff)
-
- return b.FinishedBytes()
-}