diff options
author | Valery Piashchynski <[email protected]> | 2021-06-17 19:24:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-17 19:24:35 +0300 |
commit | 68ff941c4226074206ceed9c30bd95317aa0e9fc (patch) | |
tree | 693306256281cccefb29f4eedb7f617a9022154e /plugins/broadcast/rpc.go | |
parent | 25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff) |
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r-- | plugins/broadcast/rpc.go | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go new file mode 100644 index 00000000..fa853421 --- /dev/null +++ b/plugins/broadcast/rpc.go @@ -0,0 +1,75 @@ +package broadcast + +import ( + "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" +) + +// rpc collectors struct +type rpc struct { + plugin *Plugin + log logger.Logger +} + +// Publish ... msg is a proto decoded payload +// see: pkg/pubsub/message.fbs +func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { + const op = errors.Op("broadcast_publish") + + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.Messages) + + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + return errors.E(op, err) + } + + err = r.plugin.Publish(bb) + if err != nil { + out.Ok = false + return errors.E(op, err) + } + } + + out.Ok = true + return nil +} + +// PublishAsync ... +// see: pkg/pubsub/message.fbs +func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { + const op = errors.Op("publish_async") + + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.Messages) + + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + bb, err := proto.Marshal(in.GetMessages()[i]) + if err != nil { + out.Ok = false + return errors.E(op, err) + } + + r.plugin.PublishAsync(bb) + } + + out.Ok = true + return nil +} |