summaryrefslogtreecommitdiff
path: root/plugins/websockets/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-01 00:10:31 +0300
committerGitHub <[email protected]>2021-06-01 00:10:31 +0300
commit548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch)
tree5cd2aaeeafdb50e3e46824197c721223f54695bf /plugins/websockets/rpc.go
parent8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff)
parentdf4d316d519cea6dff654bd917521a616a37f769 (diff)
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'plugins/websockets/rpc.go')
-rw-r--r--plugins/websockets/rpc.go47
1 files changed, 47 insertions, 0 deletions
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..2fb0f1b9
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,47 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Message, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+ r.log.Debug("message published", "msg", msg)
+
+ // just return in case of nil message
+ if msg == nil {
+ *ok = true
+ return nil
+ }
+
+ err := r.plugin.Publish(msg)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Message, ok *bool) error {
+ r.log.Debug("message published", "msg", msg)
+
+ // just return in case of nil message
+ if msg == nil {
+ *ok = true
+ return nil
+ }
+ // publish to the registered broker
+ r.plugin.PublishAsync(msg)
+
+ *ok = true
+ return nil
+}