summaryrefslogtreecommitdiff
path: root/plugins/broadcast/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-05 16:39:22 +0300
committerValery Piashchynski <[email protected]>2021-05-05 16:39:22 +0300
commit4fa94bb7f73a705293c2afd40fc1151a3aaa04e2 (patch)
tree6ffd858cade87600bbd4432f70db22f50c598db0 /plugins/broadcast/plugin.go
parent9ee78f937d5be67058882dd3590f89da35bca239 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/plugin.go')
-rw-r--r--plugins/broadcast/plugin.go87
1 files changed, 86 insertions, 1 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3cedf555..45051a7f 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -1,11 +1,96 @@
package broadcast
+import (
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+)
type Plugin struct {
+ broker Broker
+
+ log logger.Logger
+ cfg *Config
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ p.cfg.InitDefaults()
+
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error)
+
+ // if there are no brokers, return nil
+ if p.broker == nil {
+ errCh <- errors.E(op, errors.Str("no broker detected"))
+ return errCh
+ }
+
+ // start the underlying broker
+ go func() {
+ err := p.broker.Serve()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ }
+ }()
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ return nil
}
+// Available interface implementation for the plugin
+func (p * Plugin) Available() {}
-func (p *Plugin) Init() error {
+// Name is endure.Named interface implementation
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectBroker,
+ }
+}
+
+func (p *Plugin) CollectBroker(name endure.Named, broker Broker) {
+ p.broker = broker
+}
+
+func (p *Plugin) RPC() interface{} {
+ // create an RPC service for the collects
+ r := &rpc{
+ log: p.log,
+ svc: p,
+ }
+ return r
+}
+
+func (p *Plugin) Publish(msg []*Message) error {
+ const op = errors.Op("broadcast_plugin_publish")
return nil
}
+