summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/config.go2
-rw-r--r--plugins/broadcast/plugin.go21
-rw-r--r--plugins/kv/plugin.go57
-rw-r--r--plugins/memcached/memcachedkv/driver.go4
-rw-r--r--plugins/memory/memoryjobs/consumer.go4
-rw-r--r--plugins/memory/memorykv/kv.go4
-rw-r--r--plugins/memory/memorypubsub/pubsub.go2
-rw-r--r--plugins/memory/plugin.go2
-rw-r--r--plugins/redis/jobs/config.go34
-rw-r--r--plugins/redis/jobs/consumer.go1
-rw-r--r--plugins/redis/jobs/item.go1
-rw-r--r--plugins/redis/kv/config.go4
-rw-r--r--plugins/redis/kv/kv.go7
-rw-r--r--plugins/redis/pubsub/pubsub.go6
14 files changed, 83 insertions, 66 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 4f1e5213..9531025b 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -3,6 +3,8 @@ package broadcast
/*
# Global redis config (priority - 2)
+default:
+ # redis configuration here
websockets: # <----- one of possible subscribers
path: /ws
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index a2390df5..40263eaa 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -15,6 +15,9 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
+
+ // every driver should have config section for the local configuration
+ conf string = "config"
)
type Plugin struct {
@@ -50,7 +53,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
func (p *Plugin) Serve() chan error {
- return make(chan error)
+ return make(chan error, 1)
}
func (p *Plugin) Stop() error {
@@ -130,8 +133,8 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
}
- // config key for the particular sub-driver kv.memcached
- configKey := fmt.Sprintf("%s.%s", PluginName, key)
+ // config key for the particular sub-driver broadcast.memcached.config
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf)
drName := val.(map[string]interface{})[driver]
@@ -141,8 +144,10 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
+ switch {
// try local config first
- if p.cfgPlugin.Has(configKey) {
+ case p.cfgPlugin.Has(configKey):
+ // we found a local configuration
ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
@@ -153,9 +158,9 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
p.publishers[configKey] = ps
return ps, nil
- } else {
- // try global driver section
- ps, err := p.constructors[drStr].PSConstruct(drStr)
+ case p.cfgPlugin.Has(key):
+ // try global driver section after local
+ ps, err := p.constructors[drStr].PSConstruct(key)
if err != nil {
return nil, errors.E(op, err)
}
@@ -165,6 +170,8 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
p.publishers[configKey] = ps
return ps, nil
+ default:
+ p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", key)
}
}
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index a1144b85..86bd982f 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -10,12 +10,13 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
)
-// PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
-const PluginName string = "kv"
-
const (
+ // PluginName linked to the memory, boltdb, memcached, redis plugins. DO NOT change w/o sync.
+ PluginName string = "kv"
// driver is the mandatory field which should present in every storage
driver string = "driver"
+ // config key used to detect local configuration for the driver
+ cfg string = "config"
)
// Plugin for the unified storage
@@ -75,28 +76,50 @@ func (p *Plugin) Serve() chan error {
continue
}
- // config key for the particular sub-driver kv.memcached
- configKey := fmt.Sprintf("%s.%s", PluginName, k)
+ // config key for the particular sub-driver kv.memcached.config
+ configKey := fmt.Sprintf("%s.%s.%s", PluginName, k, cfg)
// at this point we know, that driver field present in the configuration
drName := v.(map[string]interface{})[driver]
// driver name should be a string
if drStr, ok := drName.(string); ok {
- if _, ok := p.constructors[drStr]; !ok {
- p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
+ switch {
+ // local configuration section key
+ case p.cfgPlugin.Has(configKey):
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
+ continue
+ }
+
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ // try global then
+ case p.cfgPlugin.Has(k):
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
+ continue
+ }
+
+ // use only key for the driver registration, for example rr-boltdb should be globally available
+ storage, err := p.constructors[drStr].KVConstruct(k)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ default:
+ p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", k)
continue
}
-
- storage, err := p.constructors[drStr].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
}
-
continue
}
diff --git a/plugins/memcached/memcachedkv/driver.go b/plugins/memcached/memcachedkv/driver.go
index 6d5e1802..dcb071b4 100644
--- a/plugins/memcached/memcachedkv/driver.go
+++ b/plugins/memcached/memcachedkv/driver.go
@@ -32,6 +32,10 @@ func NewMemcachedDriver(log logger.Logger, key string, cfgPlugin config.Configur
return nil, errors.E(op, err)
}
+ if s.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
s.cfg.InitDefaults()
m := memcache.New(s.cfg.Addr...)
diff --git a/plugins/memory/memoryjobs/consumer.go b/plugins/memory/memoryjobs/consumer.go
index dacc2848..79246063 100644
--- a/plugins/memory/memoryjobs/consumer.go
+++ b/plugins/memory/memoryjobs/consumer.go
@@ -61,6 +61,10 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
return nil, errors.E(op, err)
}
+ if jb.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", configKey))
+ }
+
if jb.cfg.Prefetch == 0 {
jb.cfg.Prefetch = 100_000
}
diff --git a/plugins/memory/memorykv/kv.go b/plugins/memory/memorykv/kv.go
index 9b3e176c..5383275c 100644
--- a/plugins/memory/memorykv/kv.go
+++ b/plugins/memory/memorykv/kv.go
@@ -33,6 +33,10 @@ func NewInMemoryDriver(key string, log logger.Logger, cfgPlugin config.Configure
return nil, errors.E(op, err)
}
+ if d.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
d.cfg.InitDefaults()
go d.gc()
diff --git a/plugins/memory/memorypubsub/pubsub.go b/plugins/memory/memorypubsub/pubsub.go
index 75122571..231da134 100644
--- a/plugins/memory/memorypubsub/pubsub.go
+++ b/plugins/memory/memorypubsub/pubsub.go
@@ -21,7 +21,7 @@ type PubSubDriver struct {
func NewPubSubDriver(log logger.Logger, _ string) (*PubSubDriver, error) {
ps := &PubSubDriver{
- pushCh: make(chan *pubsub.Message, 10),
+ pushCh: make(chan *pubsub.Message, 100),
storage: bst.NewBST(),
log: log,
}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 515e469a..87e0f84b 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -49,7 +49,7 @@ func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
}
func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
- const op = errors.Op("inmemory_plugin_provide")
+ const op = errors.Op("memory_plugin_construct")
st, err := memorykv.NewInMemoryDriver(key, p.log, p.cfg)
if err != nil {
return nil, errors.E(op, err)
diff --git a/plugins/redis/jobs/config.go b/plugins/redis/jobs/config.go
deleted file mode 100644
index 89d707af..00000000
--- a/plugins/redis/jobs/config.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package jobs
-
-import "time"
-
-type Config struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
-// InitDefaults initializing fill config with default values
-func (s *Config) InitDefaults() {
- if s.Addrs == nil {
- s.Addrs = []string{"127.0.0.1:6379"} // default addr is pointing to local storage
- }
-}
diff --git a/plugins/redis/jobs/consumer.go b/plugins/redis/jobs/consumer.go
deleted file mode 100644
index 415ac457..00000000
--- a/plugins/redis/jobs/consumer.go
+++ /dev/null
@@ -1 +0,0 @@
-package jobs
diff --git a/plugins/redis/jobs/item.go b/plugins/redis/jobs/item.go
deleted file mode 100644
index 415ac457..00000000
--- a/plugins/redis/jobs/item.go
+++ /dev/null
@@ -1 +0,0 @@
-package jobs
diff --git a/plugins/redis/kv/config.go b/plugins/redis/kv/config.go
index 5b760952..5bd772a9 100644
--- a/plugins/redis/kv/config.go
+++ b/plugins/redis/kv/config.go
@@ -1,6 +1,8 @@
package kv
-import "time"
+import (
+ "time"
+)
type Config struct {
Addrs []string `mapstructure:"addrs"`
diff --git a/plugins/redis/kv/kv.go b/plugins/redis/kv/kv.go
index 3d062fbb..ae55d332 100644
--- a/plugins/redis/kv/kv.go
+++ b/plugins/redis/kv/kv.go
@@ -20,7 +20,7 @@ type Driver struct {
}
func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer) (*Driver, error) {
- const op = errors.Op("new_boltdb_driver")
+ const op = errors.Op("new_redis_driver")
d := &Driver{
log: log,
@@ -32,8 +32,11 @@ func NewRedisDriver(log logger.Logger, key string, cfgPlugin config.Configurer)
return nil, errors.E(op, err)
}
+ if d.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
d.cfg.InitDefaults()
- d.log = log
d.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: d.cfg.Addrs,
diff --git a/plugins/redis/pubsub/pubsub.go b/plugins/redis/pubsub/pubsub.go
index c9ad3d58..3561ef18 100644
--- a/plugins/redis/pubsub/pubsub.go
+++ b/plugins/redis/pubsub/pubsub.go
@@ -13,7 +13,7 @@ import (
type PubSubDriver struct {
sync.RWMutex
- cfg *Config `mapstructure:"redis"`
+ cfg *Config
log logger.Logger
channel *redisChannel
@@ -34,6 +34,10 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
return nil, errors.E(op, err)
}
+ if ps.cfg == nil {
+ return nil, errors.E(op, errors.Errorf("config not found by provided key: %s", key))
+ }
+
ps.cfg.InitDefaults()
ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{