summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-10 19:05:37 +0300
committerValery Piashchynski <[email protected]>2021-09-10 19:05:37 +0300
commite94a80d4586d5a5fedc2edab850c5c8bad93395f (patch)
tree4272054647725274abb5ee69e355c1e4262760ea /plugins
parent8fa86886bd5b3c12cf161fb2c1cdd9a2cd53d1bf (diff)
fix issue with incorrectly parsing local and global configuration
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/config.go2
-rw-r--r--plugins/broadcast/plugin.go17
-rw-r--r--plugins/kv/plugin.go54
-rw-r--r--plugins/memcached/memcachedkv/driver.go4
-rw-r--r--plugins/memory/memoryjobs/consumer.go4
-rw-r--r--plugins/memory/memorykv/kv.go4
-rw-r--r--plugins/memory/plugin.go2
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go4
-rw-r--r--plugins/redis/kv/kv.go7
-rw-r--r--plugins/redis/pubsub/pubsub.go6
13 files changed, 76 insertions, 64 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 4f1e5213..9531025b 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -3,6 +3,8 @@ package broadcast
/*
# Global redis config (priority - 2)
+default:
+ # redis configuration here
websockets: # <----- one of possible subscribers
path: /ws
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index a2390df5..47b68fe7 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -15,6 +15,9 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
+
+ // every driver should have config section for the local configuration
+ conf string = "config"
)
type Plugin struct {
@@ -130,8 +133,8 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
}
- // config key for the particular sub-driver kv.memcached
- configKey := fmt.Sprintf("%s.%s", PluginName, key)
+ // config key for the particular sub-driver broadcast.memcached.config
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf)
drName := val.(map[string]interface{})[driver]
@@ -141,8 +144,10 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
+ switch {
// try local config first
- if p.cfgPlugin.Has(configKey) {
+ case p.cfgPlugin.Has(configKey):
+ // we found a local configuration
ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
@@ -153,9 +158,9 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
p.publishers[configKey] = ps
return ps, nil
- } else {
- // try global driver section
- ps, err := p.constructors[drStr].PSConstruct(drStr)
+ default:
+ // try global driver section after local
+ ps, err := p.constructors[drStr].PSConstruct(key)
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index a1144b85..a2b36e3e 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -10,12 +10,13 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
-const PluginName string = "kv"
-
const (
+ // PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
+ PluginName string = "kv"
// driver is the mandatory field which should present in every storage
driver string = "driver"
+ // config key used to detect local configuration for the driver
+ cfg string = "config"
)
// Plugin for the unified storage
@@ -75,26 +76,45 @@ func (p *Plugin) Serve() chan error {
continue
}
- // config key for the particular sub-driver kv.memcached
- configKey := fmt.Sprintf("%s.%s", PluginName, k)
+ // config key for the particular sub-driver kv.memcached.config
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, k, cfg)
// at this point we know, that driver field present in the configuration
drName := v.(map[string]interface{})[driver]
// driver name should be a string
if drStr, ok := drName.(string); ok {
- if _, ok := p.constructors[drStr]; !ok {
- p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
- continue
+ switch {
+ // local configuration section key
+ case p.cfgPlugin.Has(configKey):
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
+ continue
+ }
+
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ default:
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
+ continue
+ }
+
+ // use only key for the driver registration, for example rr-boltdb should be globally available
+ storage, err := p.constructors[drStr].KVConstruct(k)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
}
-
- storage, err := p.constructors[drStr].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
}
continue
diff --git a/plugins/memcached/memcachedkv/driver.go b/plugins/memcached/memcachedkv/driver.go
index 6d5e1802..dcb071b4 100644
--- a/plugins/memcached/memcachedkv/driver.go
+++ b/plugins/memcached/memcachedkv/driver.go
@@ -32,6 +32,10 @@ func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configur
return nil, errors.E(op, err)
}
+ if s.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
s.cfg.InitDefaults()
m := memcache.New(s.cfg.Addr...)
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index fbdedefe..bebea3ce 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -61,6 +61,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return nil, errors.E(op, err)
}
+ if jb.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey))
+ }
+
if jb.cfg.Prefetch == 0 {
jb.cfg.Prefetch = 100_000
}
diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go
index 9b3e176c..5383275c 100644
--- a/plugins/memory/memorykv/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -33,6 +33,10 @@ func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configure
return nil, errors.E(op, err)
}
+ if d.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
d.cfg.InitDefaults()
go d.gc()
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 515e469a..87e0f84b 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -49,7 +49,7 @@ func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("inmemory_plugin_provide")
+ const op = errors.Op("memory_plugin_construct")
st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
deleted file mode 100644
index 89d707af..00000000
--- a/plugins/redis/jobs/config.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package jobs
-
-import "time"
-
-type Config struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
-// InitDefaults initializing fill config with default values
-func (s *Config) InitDefaults() {
- if s.Addrs == nil {
- s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
- }
-}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
deleted file mode 100644
index 415ac457..00000000
--- a/plugins/redis/jobs/consumer.go
+++ /dev/null
@@ -1 +0,0 @@
-package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
deleted file mode 100644
index 415ac457..00000000
--- a/plugins/redis/jobs/item.go
+++ /dev/null
@@ -1 +0,0 @@
-package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
index 5b760952..5bd772a9 100644
--- a/plugins/redis/kv/config.go
+++ b/plugins/redis/kv/config.go
@@ -1,6 +1,8 @@
package kv
-import "time"
+import (
+ "time"
+)
type Config struct {
Addrs []string `mapstructure:"addrs"`
diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go
index 3d062fbb..ae55d332 100644
--- a/plugins/redis/kv/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -20,7 +20,7 @@ type Driver struct {
}
func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
- const op = errors.Op("new_boltdb_driver")
+ const op = errors.Op("new_redis_driver")
d := &Driver{
log: log,
@@ -32,8 +32,11 @@ func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer)
return nil, errors.E(op, err)
}
+ if d.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
d.cfg.InitDefaults()
- d.log = log
d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: d.cfg.Addrs,
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
index c9ad3d58..3561ef18 100644
--- a/plugins/redis/pubsub/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -13,7 +13,7 @@ import (
type PubSubDriver struct {
sync.RWMutex
- cfg *Config `mapstructure:"redis"`
+ cfg *Config
log logger.Logger
channel *redisChannel
@@ -34,6 +34,10 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
return nil, errors.E(op, err)
}
+ if ps.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
ps.cfg.InitDefaults()
ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{