diff options
author | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
commit | 2dd30155de6faaf6005027d5337a840310c827f9 (patch) | |
tree | aa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/broadcast/rpc.go | |
parent | 25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff) |
- Update redis/memory pubsubs
- Rework internal message bus
- Add new tests for the broadcast plugin and include them into the GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r-- | plugins/broadcast/rpc.go | 51 |
1 files changed, 32 insertions, 19 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 4c27cdc3..2ee211f8 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,9 +2,9 @@ package broadcast import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -14,7 +14,7 @@ type rpc struct { } // Publish ... msg is a proto decoded payload -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { const op = errors.Op("broadcast_publish") @@ -28,15 +28,23 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - err = r.plugin.Publish(bb) - if err != nil { - out.Ok = false - return errors.E(op, err) + err := r.plugin.Publish(tmp) + if err != nil { + out.Ok = false + return errors.E(op, err) + } } } @@ -45,10 +53,8 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro } // PublishAsync ... -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("publish_async") - // just return in case of nil message if in == nil { out.Ok = false @@ -60,13 +66,20 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - out.Ok = false - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - r.plugin.PublishAsync(bb) + r.plugin.PublishAsync(tmp) + } } out.Ok = true |