diff options
author | Valery Piashchynski <[email protected]> | 2021-05-05 16:39:22 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-05 16:39:22 +0300 |
commit | 4fa94bb7f73a705293c2afd40fc1151a3aaa04e2 (patch) | |
tree | 6ffd858cade87600bbd4432f70db22f50c598db0 /plugins/broadcast/plugin.go | |
parent | 9ee78f937d5be67058882dd3590f89da35bca239 (diff) |
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/plugin.go')
-rw-r--r-- | plugins/broadcast/plugin.go | 87 |
1 files changed, 86 insertions, 1 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 3cedf555..45051a7f 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -1,11 +1,96 @@ package broadcast +import ( + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +const ( + PluginName string = "broadcast" +) type Plugin struct { + broker Broker + + log logger.Logger + cfg *Config +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("broadcast_plugin_init") + + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.cfg) + if err != nil { + return errors.E(op, errors.Disabled, err) + } + + p.cfg.InitDefaults() + + p.log = log + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("broadcast_plugin_serve") + errCh := make(chan error) + + // if there are no brokers, return nil + if p.broker == nil { + errCh <- errors.E(op, errors.Str("no broker detected")) + return errCh + } + + // start the underlying broker + go func() { + err := p.broker.Serve() + if err != nil { + errCh <- errors.E(op, err) + } + }() + return errCh +} + +func (p *Plugin) Stop() error { + return nil } +// Available interface implementation for the plugin +func (p * Plugin) Available() {} -func (p *Plugin) Init() error { +// Name is endure.Named interface implementation +func (p *Plugin) Name() string { + return PluginName +} + + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectBroker, + } +} + +func (p *Plugin) CollectBroker(name endure.Named, broker Broker) { + p.broker = broker +} + +func (p *Plugin) RPC() interface{} { + // create an RPC service for the collects + r := &rpc{ + log: p.log, + svc: p, + } + return r +} + +func (p *Plugin) Publish(msg []*Message) error { + const op = errors.Op("broadcast_plugin_publish") return nil } + |