diff options
Diffstat (limited to 'plugins/broadcast/plugin.go')
-rw-r--r-- | plugins/broadcast/plugin.go | 192 |
1 files changed, 0 insertions, 192 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go deleted file mode 100644 index 40263eaa..00000000 --- a/plugins/broadcast/plugin.go +++ /dev/null @@ -1,192 +0,0 @@ -package broadcast - -import ( - "fmt" - "sync" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" - // driver is the mandatory field which should present in every storage - driver string = "driver" - - // every driver should have config section for the local configuration - conf string = "config" -) - -type Plugin struct { - sync.RWMutex - - cfg *Config - cfgPlugin config.Configurer - log logger.Logger - // publishers implement Publisher interface - // and able to receive a payload - publishers map[string]pubsub.PubSub - constructors map[string]pubsub.Constructor -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("broadcast_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - p.cfg = &Config{} - // unmarshal config section - err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) - if err != nil { - return errors.E(op, err) - } - - p.publishers = make(map[string]pubsub.PubSub) - p.constructors = make(map[string]pubsub.Constructor) - - p.log = log - p.cfgPlugin = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (p *Plugin) Stop() error { - return nil -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectPublishers, - } -} - -// CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) { - // key redis, value - interface - p.constructors[name.Name()] = constructor -} - -// Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(m *pubsub.Message) error { - p.Lock() - defer p.Unlock() - - const op = errors.Op("broadcast_plugin_publish") - - // check if any publisher registered - if len(p.publishers) > 0 { - for j := range p.publishers { - err := p.publishers[j].Publish(m) - if err != nil { - return errors.E(op, err) - } - } - return nil - } else { - p.log.Warn("no publishers registered") - } - - return nil -} - -func (p *Plugin) PublishAsync(m *pubsub.Message) { - // TODO(rustatian) channel here? - go func() { - p.Lock() - defer p.Unlock() - // check if any publisher registered - if len(p.publishers) > 0 { - for j := range p.publishers { - err := p.publishers[j].Publish(m) - if err != nil { - p.log.Error("publishAsync", "error", err) - // continue publishing to the other registered publishers - continue - } - } - } else { - p.log.Warn("no publishers registered") - } - }() -} - -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { - const op = errors.Op("broadcast_plugin_get_driver") - - // choose a driver - if val, ok := p.cfg.Data[key]; ok { - // check type of the v - // should be a map[string]interface{} - switch t := val.(type) { - // correct type - case map[string]interface{}: - if _, ok := t[driver]; !ok { - panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val))) - } - default: - return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) - } - - // config key for the particular sub-driver broadcast.memcached.config - configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf) - - drName := val.(map[string]interface{})[driver] - - // driver name should be a string - if drStr, ok := drName.(string); ok { - if _, ok := p.constructors[drStr]; !ok { - return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) - } - - switch { - // try local config first - case p.cfgPlugin.Has(configKey): - // we found a local configuration - ps, err := p.constructors[drStr].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[configKey] = ps - - return ps, nil - case p.cfgPlugin.Has(key): - // try global driver section after local - ps, err := p.constructors[drStr].PSConstruct(key) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[configKey] = ps - - return ps, nil - default: - p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", key) - } - } - } - return nil, errors.E(op, errors.Str("could not find driver by provided key")) -} - -func (p *Plugin) RPC() interface{} { - return &rpc{ - plugin: p, - log: p.log, - } -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} |