summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins/broadcast
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/plugin.go63
1 files changed, 20 insertions, 43 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 889dc2fa..a2390df5 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
- "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
@@ -16,9 +15,6 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- redis string = "redis"
- memory string = "memory"
)
type Plugin struct {
@@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error {
}
func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ // TODO(rustatian) channel here?
go func() {
p.Lock()
defer p.Unlock()
@@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
err := p.publishers[j].Publish(m)
if err != nil {
p.log.Error("publishAsync", "error", err)
- // continue publish to other registered publishers
+ // continue publishing to the other registered publishers
continue
}
}
@@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
}()
}
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// choose a driver
@@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, key)
- switch val.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[uuid.NewString()] = ps
+ drName := val.(map[string]interface{})[driver]
- return ps, nil
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ // try local config first
+ if p.cfgPlugin.Has(configKey) {
+ ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
- return ps, nil
- // then try global if local does not exist
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ return ps, nil
+ } else {
+ // try global driver section
+ ps, err := p.constructors[drStr].PSConstruct(drStr)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
+
return ps, nil
}
}