diff options
author | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
commit | efb3efa98c8555815330274f0618bfc080f4c65c (patch) | |
tree | b3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins/broadcast | |
parent | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff) |
Move drivers to the plugin's root.
Fix #771, add tests.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/plugin.go | 63 |
1 files changed, 20 insertions, 43 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 889dc2fa..a2390df5 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" @@ -16,9 +15,6 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" - - redis string = "redis" - memory string = "memory" ) type Plugin struct { @@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error { } func (p *Plugin) PublishAsync(m *pubsub.Message) { + // TODO(rustatian) channel here? go func() { p.Lock() defer p.Unlock() @@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { err := p.publishers[j].Publish(m) if err != nil { p.log.Error("publishAsync", "error", err) - // continue publish to other registered publishers + // continue publishing to the other registered publishers continue } } @@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, key) - switch val.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[uuid.NewString()] = ps + drName := val.(map[string]interface{})[driver] - return ps, nil - case redis: - if _, ok := p.constructors[redis]; !ok { - return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) + // try local config first + if p.cfgPlugin.Has(configKey) { + ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps - return ps, nil - // then try global if local does not exist - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) + return ps, nil + } else { + // try global driver section + ps, err := p.constructors[drStr].PSConstruct(drStr) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps + return ps, nil } } |