diff options
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r-- | plugins/broadcast/rpc.go | 51 |
1 files changed, 32 insertions, 19 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 4c27cdc3..2ee211f8 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,9 +2,9 @@ package broadcast import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -14,7 +14,7 @@ type rpc struct { } // Publish ... msg is a proto decoded payload -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { const op = errors.Op("broadcast_publish") @@ -28,15 +28,23 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - err = r.plugin.Publish(bb) - if err != nil { - out.Ok = false - return errors.E(op, err) + err := r.plugin.Publish(tmp) + if err != nil { + out.Ok = false + return errors.E(op, err) + } } } @@ -45,10 +53,8 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro } // PublishAsync ... -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("publish_async") - // just return in case of nil message if in == nil { out.Ok = false @@ -60,13 +66,20 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - out.Ok = false - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - r.plugin.PublishAsync(bb) + r.plugin.PublishAsync(tmp) + } } out.Ok = true |