summaryrefslogtreecommitdiff
path: root/plugins/broadcast/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r--plugins/broadcast/rpc.go51
1 files changed, 32 insertions, 19 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 4c27cdc3..2ee211f8 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,9 +2,9 @@ package broadcast
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -14,7 +14,7 @@ type rpc struct {
}
// Publish ... msg is a proto decoded payload
-// see: pkg/pubsub/message.fbs
+// see: root/proto
func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("broadcast_publish")
@@ -28,15 +28,23 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- return errors.E(op, err)
- }
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
- err = r.plugin.Publish(bb)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
+ err := r.plugin.Publish(tmp)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
}
}
@@ -45,10 +53,8 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
}
// PublishAsync ...
-// see: pkg/pubsub/message.fbs
+// see: root/proto
func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("publish_async")
-
// just return in case of nil message
if in == nil {
out.Ok = false
@@ -60,13 +66,20 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response)
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
- r.plugin.PublishAsync(bb)
+ r.plugin.PublishAsync(tmp)
+ }
}
out.Ok = true