diff options
author | Valery Piashchynski <[email protected]> | 2021-09-10 19:05:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-10 19:05:37 +0300 |
commit | e94a80d4586d5a5fedc2edab850c5c8bad93395f (patch) | |
tree | 4272054647725274abb5ee69e355c1e4262760ea /plugins | |
parent | 8fa86886bd5b3c12cf161fb2c1cdd9a2cd53d1bf (diff) |
fix issue with incorrectly parsing local and global configuration
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/broadcast/config.go | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 17 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 54 | ||||
-rw-r--r-- | plugins/memcached/memcachedkv/driver.go | 4 | ||||
-rw-r--r-- | plugins/memory/memoryjobs/consumer.go | 4 | ||||
-rw-r--r-- | plugins/memory/memorykv/kv.go | 4 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 2 | ||||
-rw-r--r-- | plugins/redis/jobs/config.go | 34 | ||||
-rw-r--r-- | plugins/redis/jobs/consumer.go | 1 | ||||
-rw-r--r-- | plugins/redis/jobs/item.go | 1 | ||||
-rw-r--r-- | plugins/redis/kv/config.go | 4 | ||||
-rw-r--r-- | plugins/redis/kv/kv.go | 7 | ||||
-rw-r--r-- | plugins/redis/pubsub/pubsub.go | 6 |
13 files changed, 76 insertions, 64 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 4f1e5213..9531025b 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -3,6 +3,8 @@ package broadcast /* # Global redis config (priority - 2) +default: + # redis configuration here websockets: # <----- one of possible subscribers path: /ws diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index a2390df5..47b68fe7 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -15,6 +15,9 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" + + // every driver should have config section for the local configuration + conf string = "config" ) type Plugin struct { @@ -130,8 +133,8 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) } - // config key for the particular sub-driver kv.memcached - configKey := fmt.Sprintf("%s.%s", PluginName, key) + // config key for the particular sub-driver broadcast.memcached.config + configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf) drName := val.(map[string]interface{})[driver] @@ -141,8 +144,10 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } + switch { // try local config first - if p.cfgPlugin.Has(configKey) { + case p.cfgPlugin.Has(configKey): + // we found a local configuration ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) @@ -153,9 +158,9 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { p.publishers[configKey] = ps return ps, nil - } else { - // try global driver section - ps, err := p.constructors[drStr].PSConstruct(drStr) + default: + // try global driver section after local + ps, err := p.constructors[drStr].PSConstruct(key) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index a1144b85..a2b36e3e 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -10,12 +10,13 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" ) -// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. -const PluginName string = "kv" - const ( + // PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync. + PluginName string = "kv" // driver is the mandatory field which should present in every storage driver string = "driver" + // config key used to detect local configuration for the driver + cfg string = "config" ) // Plugin for the unified storage @@ -75,26 +76,45 @@ func (p *Plugin) Serve() chan error { continue } - // config key for the particular sub-driver kv.memcached - configKey := fmt.Sprintf("%s.%s", PluginName, k) + // config key for the particular sub-driver kv.memcached.config + configKey := fmt.Sprintf("%s.%s.%s", PluginName, k, cfg) // at this point we know, that driver field present in the configuration drName := v.(map[string]interface{})[driver] // driver name should be a string if drStr, ok := drName.(string); ok { - if _, ok := p.constructors[drStr]; !ok { - p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) - continue + switch { + // local configuration section key + case p.cfgPlugin.Has(configKey): + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) + continue + } + + storage, err := p.constructors[drStr].KVConstruct(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage + default: + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) + continue + } + + // use only key for the driver registration, for example rr-boltdb should be globally available + storage, err := p.constructors[drStr].KVConstruct(k) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the storage + p.storages[k] = storage } - - storage, err := p.constructors[drStr].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage } continue diff --git a/plugins/memcached/memcachedkv/driver.go b/plugins/memcached/memcachedkv/driver.go index 6d5e1802..dcb071b4 100644 --- a/plugins/memcached/memcachedkv/driver.go +++ b/plugins/memcached/memcachedkv/driver.go @@ -32,6 +32,10 @@ func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configur return nil, errors.E(op, err) } + if s.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) + } + s.cfg.InitDefaults() m := memcache.New(s.cfg.Addr...) diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go index fbdedefe..bebea3ce 100644 --- a/plugins/memory/memoryjobs/consumer.go +++ b/plugins/memory/memoryjobs/consumer.go @@ -61,6 +61,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh return nil, errors.E(op, err) } + if jb.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey)) + } + if jb.cfg.Prefetch == 0 { jb.cfg.Prefetch = 100_000 } diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go index 9b3e176c..5383275c 100644 --- a/plugins/memory/memorykv/kv.go +++ b/plugins/memory/memorykv/kv.go @@ -33,6 +33,10 @@ func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configure return nil, errors.E(op, err) } + if d.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) + } + d.cfg.InitDefaults() go d.gc() diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index 515e469a..87e0f84b 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -49,7 +49,7 @@ func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { } func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { - const op = errors.Op("inmemory_plugin_provide") + const op = errors.Op("memory_plugin_construct") st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg) if err != nil { return nil, errors.E(op, err) diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go deleted file mode 100644 index 89d707af..00000000 --- a/plugins/redis/jobs/config.go +++ /dev/null @@ -1,34 +0,0 @@ -package jobs - -import "time" - -type Config struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - -// InitDefaults initializing fill config with default values -func (s *Config) InitDefaults() { - if s.Addrs == nil { - s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage - } -} diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go deleted file mode 100644 index 415ac457..00000000 --- a/plugins/redis/jobs/consumer.go +++ /dev/null @@ -1 +0,0 @@ -package jobs diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go deleted file mode 100644 index 415ac457..00000000 --- a/plugins/redis/jobs/item.go +++ /dev/null @@ -1 +0,0 @@ -package jobs diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go index 5b760952..5bd772a9 100644 --- a/plugins/redis/kv/config.go +++ b/plugins/redis/kv/config.go @@ -1,6 +1,8 @@ package kv -import "time" +import ( + "time" +) type Config struct { Addrs []string `mapstructure:"addrs"` diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go index 3d062fbb..ae55d332 100644 --- a/plugins/redis/kv/kv.go +++ b/plugins/redis/kv/kv.go @@ -20,7 +20,7 @@ type Driver struct { } func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) { - const op = errors.Op("new_boltdb_driver") + const op = errors.Op("new_redis_driver") d := &Driver{ log: log, @@ -32,8 +32,11 @@ func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) return nil, errors.E(op, err) } + if d.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) + } + d.cfg.InitDefaults() - d.log = log d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: d.cfg.Addrs, diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go index c9ad3d58..3561ef18 100644 --- a/plugins/redis/pubsub/pubsub.go +++ b/plugins/redis/pubsub/pubsub.go @@ -13,7 +13,7 @@ import ( type PubSubDriver struct { sync.RWMutex - cfg *Config `mapstructure:"redis"` + cfg *Config log logger.Logger channel *redisChannel @@ -34,6 +34,10 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, return nil, errors.E(op, err) } + if ps.cfg == nil { + return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key)) + } + ps.cfg.InitDefaults() ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{ |