diff options
Diffstat (limited to 'plugins/websockets/rpc.go')
-rw-r--r-- | plugins/websockets/rpc.go | 93 |
1 files changed, 23 insertions, 70 deletions
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go index 6c2cacb4..00c1dd91 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/websockets/rpc.go @@ -1,10 +1,10 @@ package websockets import ( - flatbuffers "github.com/google/flatbuffers/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pubsub/message" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -13,39 +13,32 @@ type rpc struct { log logger.Logger } -// Publish ... msg is a flatbuffers decoded payload +// Publish ... msg is a proto decoded payload // see: pkg/pubsub/message.fbs -func (r *rpc) Publish(msg []byte, ok *bool) error { +func (r *rpc) Publish(in *websocketsv1.Messages, ok *bool) error { const op = errors.Op("broadcast_publish") - r.log.Debug("message published") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} + r.log.Debug("message published", "msg", in.Messages) - b := flatbuffers.NewBuilder(100) + msgLen := len(in.GetMessages()) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) - - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } - err := r.plugin.Publish(bb) + err = r.plugin.Publish(bb) if err != nil { *ok = false - b.Reset() return errors.E(op, err) } - b.Reset() } *ok = true @@ -54,68 +47,28 @@ func (r *rpc) Publish(msg []byte, ok *bool) error { // PublishAsync ... // see: pkg/pubsub/message.fbs -func (r *rpc) PublishAsync(msg []byte, ok *bool) error { - r.log.Debug("message published", "msg", msg) +func (r *rpc) PublishAsync(in *websocketsv1.Messages, ok *bool) error { + const op = errors.Op("publish_async") // just return in case of nil message - if msg == nil { + if in == nil { *ok = true return nil } - fbsMsg := message.GetRootAsMessages(msg, 0) - tmpMsg := &message.Message{} - - b := flatbuffers.NewBuilder(100) + r.log.Debug("message published", "msg", in.Messages) - for i := 0; i < fbsMsg.MessagesLength(); i++ { - // init a message - fbsMsg.Messages(tmpMsg, i) + msgLen := len(in.GetMessages()) - // overhead HERE - orig := serializeMsg(b, tmpMsg) - bb := make([]byte, len(orig)) - copy(bb, orig) + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } r.plugin.PublishAsync(bb) - b.Reset() } *ok = true return nil } - -func serializeMsg(b *flatbuffers.Builder, msg *message.Message) []byte { - cmdOff := b.CreateByteString(msg.Command()) - brokerOff := b.CreateByteString(msg.Broker()) - - offsets := make([]flatbuffers.UOffsetT, msg.TopicsLength()) - for j := msg.TopicsLength() - 1; j >= 0; j-- { - offsets[j] = b.CreateByteString(msg.Topics(j)) - } - - message.MessageStartTopicsVector(b, len(offsets)) - - for j := len(offsets) - 1; j >= 0; j-- { - b.PrependUOffsetT(offsets[j]) - } - - tOff := b.EndVector(len(offsets)) - bb := make([]byte, msg.PayloadLength()) - for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) - } - pOff := b.CreateByteVector(bb) - - message.MessageStart(b) - - message.MessageAddCommand(b, cmdOff) - message.MessageAddBroker(b, brokerOff) - message.MessageAddTopics(b, tOff) - message.MessageAddPayload(b, pOff) - - fOff := message.MessageEnd(b) - b.Finish(fOff) - - return b.FinishedBytes() -} |