diff options
Diffstat (limited to 'plugins/broadcast/rpc.go')
-rw-r--r-- | plugins/broadcast/rpc.go | 87 |
1 files changed, 87 insertions, 0 deletions
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go new file mode 100644 index 00000000..2ee211f8 --- /dev/null +++ b/plugins/broadcast/rpc.go @@ -0,0 +1,87 @@ +package broadcast + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" +) + +// rpc collectors struct +type rpc struct { + plugin *Plugin + log logger.Logger +} + +// Publish ... msg is a proto decoded payload +// see: root/proto +func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { + const op = errors.Op("broadcast_publish") + + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.String()) + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } + + err := r.plugin.Publish(tmp) + if err != nil { + out.Ok = false + return errors.E(op, err) + } + } + } + + out.Ok = true + return nil +} + +// PublishAsync ... +// see: root/proto +func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { + // just return in case of nil message + if in == nil { + out.Ok = false + return nil + } + + r.log.Debug("message published", "msg", in.GetMessages()) + + msgLen := len(in.GetMessages()) + + for i := 0; i < msgLen; i++ { + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } + + r.plugin.PublishAsync(tmp) + } + } + + out.Ok = true + return nil +} |