summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c /plugins
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/config.go)0
-rw-r--r--plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/consumer.go)0
-rw-r--r--plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/item.go)0
-rw-r--r--plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/listener.go)0
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go)0
-rw-r--r--plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/redial.go)0
-rw-r--r--plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go)2
-rw-r--r--plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go)0
-rw-r--r--plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go)0
-rw-r--r--plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go)0
-rw-r--r--plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go)0
-rw-r--r--plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go)0
-rw-r--r--plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go)0
-rw-r--r--plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go)0
-rw-r--r--plugins/boltdb/boltjobs/listener.go2
-rw-r--r--plugins/broadcast/plugin.go63
-rw-r--r--plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go)0
-rw-r--r--plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go)0
-rw-r--r--plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go)0
-rw-r--r--plugins/kv/plugin.go116
-rw-r--r--plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go)0
-rw-r--r--plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go)0
-rw-r--r--plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go)0
-rw-r--r--plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go)0
-rw-r--r--plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go)0
-rw-r--r--plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go)0
-rw-r--r--plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go)0
-rw-r--r--plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go)0
28 files changed, 34 insertions, 149 deletions
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go
index ac2f6e53..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/config.go
+++ b/plugins/amqp/amqpjobs/config.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 1931ceaa..1931ceaa 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index a8e305ea..a8e305ea 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go
index 0156d55c..0156d55c 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/listener.go
+++ b/plugins/amqp/amqpjobs/listener.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
index e260fabe..e260fabe 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 0835e3ea..0835e3ea 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go
index 8797d20b..c4f5f1da 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/amqp/plugin.go
@@ -4,8 +4,8 @@ import (
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go
index a8069f5d..a8069f5d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/beanstalk/config.go
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go
index d3241b37..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/beanstalk/connection.go
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go
index e43207eb..e43207eb 100644
--- a/plugins/jobs/drivers/beanstalk/encode_test.go
+++ b/plugins/beanstalk/encode_test.go
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go
index 0a6cd560..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go
index 6bb159ea..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/beanstalk/listen.go
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go
index 529d1474..529d1474 100644
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ b/plugins/beanstalk/plugin.go
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 1f8e6ff1..4a8d6cd9 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -11,7 +11,7 @@ func (c *consumer) listener() {
if err != nil {
panic(err)
}
- //cursor := tx.Cursor()
+ // cursor := tx.Cursor()
err = tx.Commit()
if err != nil {
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 889dc2fa..a2390df5 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
- "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
@@ -16,9 +15,6 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- redis string = "redis"
- memory string = "memory"
)
type Plugin struct {
@@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error {
}
func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ // TODO(rustatian) channel here?
go func() {
p.Lock()
defer p.Unlock()
@@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
err := p.publishers[j].Publish(m)
if err != nil {
p.log.Error("publishAsync", "error", err)
- // continue publish to other registered publishers
+ // continue publishing to the other registered publishers
continue
}
}
@@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
}()
}
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// choose a driver
@@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, key)
- switch val.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[uuid.NewString()] = ps
+ drName := val.(map[string]interface{})[driver]
- return ps, nil
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ // try local config first
+ if p.cfgPlugin.Has(configKey) {
+ ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
- return ps, nil
- // then try global if local does not exist
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ return ps, nil
+ } else {
+ // try global driver section
+ ps, err := p.constructors[drStr].PSConstruct(drStr)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
+
return ps, nil
}
}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index 91b8eda9..91b8eda9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go
index 3298424d..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/ephemeral/item.go
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
index 28495abb..28495abb 100644
--- a/plugins/jobs/drivers/ephemeral/plugin.go
+++ b/plugins/ephemeral/plugin.go
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 9a19f96c..c6ca96c3 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -16,11 +16,6 @@ const PluginName string = "kv"
const (
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- memcached string = "memcached"
- boltdb string = "boltdb"
- redis string = "redis"
- memory string = "memory"
)
// Plugin for the unified storage
@@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error { //nolint:gocognit
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
// value - storage
- /*
- For example we can have here 2 storages (but they are not pre-configured)
- for the boltdb and memcached
- We should provide here the actual configs for the all requested storages
- kv:
- boltdb-south:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- boltdb-north:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- memcached:
- driver: memcached
- addr: [ "127.0.0.1:11211" ]
-
-
- For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
- when user requests for example boltdb-south, we should provide that particular preconfigured storage
- */
+ // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ // when user requests for example boltdb-south, we should provide that particular preconfigured storage
+
for k, v := range p.cfg.Data {
// for example if the key not properly formatted (yaml)
if v == nil {
@@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
- // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs
- switch v.(map[string]interface{})[driver] {
- case memcached:
- if _, ok := p.constructors[memcached]; !ok {
- p.log.Warn("no memcached constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memcached].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
+ drName := v.(map[string]interface{})[driver]
- case boltdb:
- if _, ok := p.constructors[boltdb]; !ok {
- p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
continue
}
- storage, err := p.constructors[boltdb].KVConstruct(configKey)
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memory].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- p.log.Warn("no redis constructors registered", "registered", p.constructors)
- continue
- }
-
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case p.cfgPlugin.Has(redis):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- continue
- default:
- // otherwise - error, no local or global config
- p.log.Warn("no global or local redis configuration provided", "key", configKey)
- continue
- }
-
- default:
- p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
+
+ continue
}
return errCh
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go
index 6d413790..6d413790 100644
--- a/plugins/kv/drivers/memcached/config.go
+++ b/plugins/memcached/config.go
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go
index e24747fe..e24747fe 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/memcached/driver.go
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go
index 9b2a1ca8..9b2a1ca8 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/sqs/config.go
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..23203190 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go
index 996adf6c..996adf6c 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/sqs/item.go
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go
index a4280af2..a4280af2 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/sqs/listener.go
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go
index 54f61ff5..54f61ff5 100644
--- a/plugins/jobs/drivers/sqs/plugin.go
+++ b/plugins/sqs/plugin.go