summaryrefslogtreecommitdiff
path: root/plugins/broadcast/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
committerValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
commit68ff941c4226074206ceed9c30bd95317aa0e9fc (patch)
tree693306256281cccefb29f4eedb7f617a9022154e /plugins/broadcast/rpc.go
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r--plugins/broadcast/rpc.go75
1 files changed, 75 insertions, 0 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..fa853421
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,75 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: pkg/pubsub/message.fbs
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = r.plugin.Publish(bb)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("publish_async")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+
+ r.plugin.PublishAsync(bb)
+ }
+
+ out.Ok = true
+ return nil
+}