summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /plugins/broadcast
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/plugin.go63
1 files changed, 20 insertions, 43 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 889dc2fa..a2390df5 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
- "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
@@ -16,9 +15,6 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- redis string = "redis"
- memory string = "memory"
)
type Plugin struct {
@@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error {
}
func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ // TODO(rustatian) channel here?
go func() {
p.Lock()
defer p.Unlock()
@@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
err := p.publishers[j].Publish(m)
if err != nil {
p.log.Error("publishAsync", "error", err)
- // continue publish to other registered publishers
+ // continue publishing to the other registered publishers
continue
}
}
@@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
}()
}
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// choose a driver
@@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, key)
- switch val.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[uuid.NewString()] = ps
+ drName := val.(map[string]interface{})[driver]
- return ps, nil
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ // try local config first
+ if p.cfgPlugin.Has(configKey) {
+ ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
- return ps, nil
- // then try global if local does not exist
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ return ps, nil
+ } else {
+ // try global driver section
+ ps, err := p.constructors[drStr].PSConstruct(drStr)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
+
return ps, nil
}
}