summaryrefslogtreecommitdiff
path: root/plugins/broadcast/rpc.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r--plugins/broadcast/rpc.go75
1 files changed, 75 insertions, 0 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..fa853421
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,75 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: pkg/pubsub/message.fbs
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = r.plugin.Publish(bb)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("publish_async")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+
+ r.plugin.PublishAsync(bb)
+ }
+
+ out.Ok = true
+ return nil
+}