diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
commit | fe7bb0fe758d573fe353df028257ed66c6eccf66 (patch) | |
tree | 74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 /plugins/broadcast/plugin.go | |
parent | 68ff941c4226074206ceed9c30bd95317aa0e9fc (diff) |
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/plugin.go')
-rw-r--r-- | plugins/broadcast/plugin.go | 135 |
1 files changed, 119 insertions, 16 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 3b771746..c43b2e4c 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -1,25 +1,37 @@ package broadcast import ( + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) -const PluginName string = "broadcast" +const ( + PluginName string = "broadcast" + // driver is the mandatory field which should present in every storage + driver string = "driver" + + redis string = "redis" + memory string = "memory" +) type Plugin struct { sync.RWMutex - log logger.Logger + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // publishers implement Publisher interface // and able to receive a payload - publishers map[string]pubsub.Publisher + publishers map[string]pubsub.PubSub + providers map[string]pubsub.PSProvider } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } + p.cfg = &Config{} + // unmarshal config section + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + + p.publishers = make(map[string]pubsub.PubSub) + p.providers = make(map[string]pubsub.PSProvider) - p.publishers = make(map[string]pubsub.Publisher) p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("broadcast_plugin_serve") + errCh := make(chan error, 1) + + // iterate over config + for k, v := range p.cfg.Data { + if v == nil { + continue + } + + switch t := v.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + default: + p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") + continue + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, k) + + switch v.(map[string]interface{})[driver] { + case memory: + if _, ok := p.providers[memory]; !ok { + p.log.Warn("no memory drivers registered", "registered", p.publishers) + continue + } + ps, err := p.providers[memory].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case redis: + if _, ok := p.providers[redis]; !ok { + p.log.Warn("no redis drivers registered", "registered", p.publishers) + continue + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case p.cfgPlugin.Has(redis): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + continue + } + } + } + + return errCh +} + +func (p *Plugin) Stop() error { return nil } @@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) { - p.publishers[name.Name()] = subscriber +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) { + // key redis, value - interface + p.providers[name.Name()] = subscriber } // Publish is an entry point to the websocket PUBSUB @@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) { // Get payload for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.publishers[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - p.log.Error("publish async error", "error", err) + if len(p.publishers) > 0 { + for j := range p.publishers { + p.publishers[j].PublishAsync(m) } - } else { - p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker()) + return } + p.log.Warn("no publishers registered") } }() } -func (p *Plugin) GetDriver(key string) pubsub.SubReader { - println(key) - return nil +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { + const op = errors.Op("broadcast_plugin_get_driver") + // key - driver, default for example + // we should find `default` in the collected pubsubs providers + if pub, ok := p.publishers[key]; ok { + return pub, nil + } + return nil, errors.E(op, errors.Str("could not find driver by provided key")) } func (p *Plugin) RPC() interface{} { |