diff options
author | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
commit | efb3efa98c8555815330274f0618bfc080f4c65c (patch) | |
tree | b3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins | |
parent | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff) |
Move drivers to the plugin's root.
Fix #771, add tests.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/config.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/item.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/listener.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/redial.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go) | 2 | ||||
-rw-r--r-- | plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 63 | ||||
-rw-r--r-- | plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go) | 0 | ||||
-rw-r--r-- | plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 116 | ||||
-rw-r--r-- | plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go) | 0 |
28 files changed, 34 insertions, 149 deletions
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go index ac2f6e53..ac2f6e53 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/config.go +++ b/plugins/amqp/amqpjobs/config.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1931ceaa..1931ceaa 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index a8e305ea..a8e305ea 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go index 0156d55c..0156d55c 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/listener.go +++ b/plugins/amqp/amqpjobs/listener.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go index e260fabe..e260fabe 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go +++ b/plugins/amqp/amqpjobs/rabbit_init.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 0835e3ea..0835e3ea 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go index 8797d20b..c4f5f1da 100644 --- a/plugins/jobs/drivers/amqp/plugin.go +++ b/plugins/amqp/plugin.go @@ -4,8 +4,8 @@ import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go index a8069f5d..a8069f5d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/beanstalk/config.go diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go index d3241b37..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/beanstalk/connection.go diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go index e43207eb..e43207eb 100644 --- a/plugins/jobs/drivers/beanstalk/encode_test.go +++ b/plugins/beanstalk/encode_test.go diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/beanstalk/item.go diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go index 6bb159ea..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/beanstalk/listen.go diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go index 529d1474..529d1474 100644 --- a/plugins/jobs/drivers/beanstalk/plugin.go +++ b/plugins/beanstalk/plugin.go diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 1f8e6ff1..4a8d6cd9 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -11,7 +11,7 @@ func (c *consumer) listener() { if err != nil { panic(err) } - //cursor := tx.Cursor() + // cursor := tx.Cursor() err = tx.Commit() if err != nil { diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 889dc2fa..a2390df5 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" @@ -16,9 +15,6 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" - - redis string = "redis" - memory string = "memory" ) type Plugin struct { @@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error { } func (p *Plugin) PublishAsync(m *pubsub.Message) { + // TODO(rustatian) channel here? go func() { p.Lock() defer p.Unlock() @@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { err := p.publishers[j].Publish(m) if err != nil { p.log.Error("publishAsync", "error", err) - // continue publish to other registered publishers + // continue publishing to the other registered publishers continue } } @@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, key) - switch val.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[uuid.NewString()] = ps + drName := val.(map[string]interface{})[driver] - return ps, nil - case redis: - if _, ok := p.constructors[redis]; !ok { - return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) + // try local config first + if p.cfgPlugin.Has(configKey) { + ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps - return ps, nil - // then try global if local does not exist - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) + return ps, nil + } else { + // try global driver section + ps, err := p.constructors[drStr].PSConstruct(drStr) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps + return ps, nil } } diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go index 91b8eda9..91b8eda9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/ephemeral/consumer.go diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go index 3298424d..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/ephemeral/item.go diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go index 28495abb..28495abb 100644 --- a/plugins/jobs/drivers/ephemeral/plugin.go +++ b/plugins/ephemeral/plugin.go diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 9a19f96c..c6ca96c3 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -16,11 +16,6 @@ const PluginName string = "kv" const ( // driver is the mandatory field which should present in every storage driver string = "driver" - - memcached string = "memcached" - boltdb string = "boltdb" - redis string = "redis" - memory string = "memory" ) // Plugin for the unified storage @@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { //nolint:gocognit +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config // value - storage - /* - For example we can have here 2 storages (but they are not pre-configured) - for the boltdb and memcached - We should provide here the actual configs for the all requested storages - kv: - boltdb-south: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - boltdb-north: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - memcached: - driver: memcached - addr: [ "127.0.0.1:11211" ] - - - For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - when user requests for example boltdb-south, we should provide that particular preconfigured storage - */ + // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + // when user requests for example boltdb-south, we should provide that particular preconfigured storage + for k, v := range p.cfg.Data { // for example if the key not properly formatted (yaml) if v == nil { @@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the configuration - // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs - switch v.(map[string]interface{})[driver] { - case memcached: - if _, ok := p.constructors[memcached]; !ok { - p.log.Warn("no memcached constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memcached].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage + drName := v.(map[string]interface{})[driver] - case boltdb: - if _, ok := p.constructors[boltdb]; !ok { - p.log.Warn("no boltdb constructors registered", "registered", p.constructors) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) continue } - storage, err := p.constructors[boltdb].KVConstruct(configKey) + storage, err := p.constructors[drStr].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage - case memory: - if _, ok := p.constructors[memory]; !ok { - p.log.Warn("no in-memory constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memory].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case redis: - if _, ok := p.constructors[redis]; !ok { - p.log.Warn("no redis constructors registered", "registered", p.constructors) - continue - } - - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case p.cfgPlugin.Has(redis): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - continue - default: - // otherwise - error, no local or global config - p.log.Warn("no global or local redis configuration provided", "key", configKey) - continue - } - - default: - p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) } + + continue } return errCh diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go index 6d413790..6d413790 100644 --- a/plugins/kv/drivers/memcached/config.go +++ b/plugins/memcached/config.go diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go index e24747fe..e24747fe 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/memcached/driver.go diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/memcached/plugin.go diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go index 9b2a1ca8..9b2a1ca8 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/sqs/config.go diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go index 23203190..23203190 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/sqs/consumer.go diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go index 996adf6c..996adf6c 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/sqs/item.go diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go index a4280af2..a4280af2 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/sqs/listener.go diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go index 54f61ff5..54f61ff5 100644 --- a/plugins/jobs/drivers/sqs/plugin.go +++ b/plugins/sqs/plugin.go |