summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xpkg/pool/static_pool.go2
-rw-r--r--pkg/pool/supervisor_test.go50
-rw-r--r--pkg/worker_watcher/container/channel/vec.go5
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
-rw-r--r--plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/config.go)0
-rw-r--r--plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/consumer.go)0
-rw-r--r--plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/item.go)0
-rw-r--r--plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/listener.go)0
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go)0
-rw-r--r--plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/redial.go)0
-rw-r--r--plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go)2
-rw-r--r--plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go)0
-rw-r--r--plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go)0
-rw-r--r--plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go)0
-rw-r--r--plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go)0
-rw-r--r--plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go)0
-rw-r--r--plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go)0
-rw-r--r--plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go)0
-rw-r--r--plugins/boltdb/boltjobs/listener.go2
-rw-r--r--plugins/broadcast/plugin.go63
-rw-r--r--plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go)0
-rw-r--r--plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go)0
-rw-r--r--plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go)0
-rw-r--r--plugins/kv/plugin.go116
-rw-r--r--plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go)0
-rw-r--r--plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go)0
-rw-r--r--plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go)0
-rw-r--r--plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go)0
-rw-r--r--plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go)0
-rw-r--r--plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go)0
-rw-r--r--plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go)0
-rw-r--r--plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go)0
-rw-r--r--tests/allocate-failed.php18
-rw-r--r--tests/plugins/jobs/jobs_amqp_test.go2
-rw-r--r--tests/plugins/jobs/jobs_beanstalk_test.go2
-rw-r--r--tests/plugins/jobs/jobs_ephemeral_test.go2
-rw-r--r--tests/plugins/jobs/jobs_general_test.go4
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go2
-rw-r--r--tests/plugins/jobs/jobs_with_toxics_test.go6
-rw-r--r--tests/plugins/kv/storage_plugin_test.go2
40 files changed, 178 insertions, 178 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 3eb0714f..720ca9da 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index d1b24574..14df513e 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "os"
"os/exec"
"testing"
"time"
@@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
<-block
p.Destroy(context.Background())
}
+
+func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(2),
+ AllocateTimeout: time.Second * 15,
+ DestroyTimeout: time.Second * 5,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ require.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ // should be ok
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ require.NoError(t, err)
+
+ // after creating this file, PHP will fail
+ file, err := os.Create("break")
+ require.NoError(t, err)
+
+ time.Sleep(time.Second * 5)
+ assert.NoError(t, file.Close())
+ assert.NoError(t, os.Remove("break"))
+
+ defer func() {
+ if r := recover(); r != nil {
+ assert.Fail(t, "panic should not be fired!")
+ } else {
+ p.Destroy(context.Background())
+ }
+ }()
+}
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 7fb65a92..5605f1e0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -15,14 +15,11 @@ type Vec struct {
destroy uint64
// channel with the workers
workers chan worker.BaseProcess
-
- len uint64
}
func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- len: len,
workers: make(chan worker.BaseProcess, len),
}
@@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
1. TTL is set with no requests during the TTL
2. Violated Get <-> Release operation (how ??)
*/
- for i := uint64(0); i < v.len; i++ {
+ for i := 0; i < len(v.workers); i++ {
/*
We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
*/
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348be199..bdd91423 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Vector interface represents vector container
@@ -30,21 +32,24 @@ type workerWatcher struct {
sync.RWMutex
container Vector
// used to control Destroy stage (that all workers are in the container)
- numWorkers uint64
+ numWorkers *uint64
workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
+ allocator worker.Allocator
+ allocateTimeout time.Duration
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
- numWorkers: numWorkers,
+ container: channel.NewVector(numWorkers),
- workers: make([]worker.BaseProcess, 0, numWorkers),
+ // pass a ptr to the number of workers to avoid blocking in the TTL loop
+ numWorkers: utils.Uint64(numWorkers),
+ allocateTimeout: allocateTimeout,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
events: events,
@@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
+
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, errors.WorkerAllocate, err)
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
+ })
+
+ // if no timeout, return error immediately
+ if ww.allocateTimeout == 0 {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ tt := time.After(ww.allocateTimeout)
+ for {
+ select {
+ case <-tt:
+ // reduce number of workers
+ atomic.AddUint64(ww.numWorkers, ^uint64(0))
+ // timeout exceed, worker can't be allocated
+ return errors.E(op, errors.WorkerAllocate, err)
+ default:
+ sw, err = ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
+ })
+ continue
+ }
+
+ // reallocated
+ goto done
+ }
+ }
}
+done:
// add worker to Wait
ww.addToWatch(sw)
+ ww.Lock()
// add new worker to the workers slice (to get information about workers in parallel)
ww.workers = append(ww.workers, sw)
-
- // unlock Allocate mutex
ww.Unlock()
+
// push the worker to the container
ww.Release(sw)
return nil
@@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
for i := 0; i < len(ww.workers); i++ {
if ww.workers[i].Pid() == pid {
ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker
+ // kill worker, just to be sure it's dead
_ = wb.Kill()
return
}
@@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
-// Destroy all underlying container (but let them to complete the task)
+// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(_ context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
ww.Lock()
@@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
case <-tt.C:
ww.Lock()
// that might be one of the workers is working
- if ww.numWorkers != uint64(len(ww.workers)) {
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
ww.Unlock()
continue
}
@@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
+ if len(ww.workers) == 0 {
+ return nil
+ }
+
base := make([]worker.BaseProcess, 0, len(ww.workers))
for i := 0; i < len(ww.workers); i++ {
base = append(base, ww.workers[i])
@@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
Event: events.EventPoolError,
Payload: errors.E(op, err),
})
+
+ // no workers at all, panic
+ if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ }
}
}
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go
index ac2f6e53..ac2f6e53 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/config.go
+++ b/plugins/amqp/amqpjobs/config.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go
index 1931ceaa..1931ceaa 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go
+++ b/plugins/amqp/amqpjobs/consumer.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index a8e305ea..a8e305ea 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go
index 0156d55c..0156d55c 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/listener.go
+++ b/plugins/amqp/amqpjobs/listener.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
index e260fabe..e260fabe 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 0835e3ea..0835e3ea 100644
--- a/plugins/jobs/drivers/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go
index 8797d20b..c4f5f1da 100644
--- a/plugins/jobs/drivers/amqp/plugin.go
+++ b/plugins/amqp/plugin.go
@@ -4,8 +4,8 @@ import (
"github.com/spiral/roadrunner/v2/common/jobs"
"github.com/spiral/roadrunner/v2/pkg/events"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
+ "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
)
diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go
index a8069f5d..a8069f5d 100644
--- a/plugins/jobs/drivers/beanstalk/config.go
+++ b/plugins/beanstalk/config.go
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go
index d3241b37..d3241b37 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/beanstalk/connection.go
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go
index 5ef89983..5ef89983 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/beanstalk/consumer.go
diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go
index e43207eb..e43207eb 100644
--- a/plugins/jobs/drivers/beanstalk/encode_test.go
+++ b/plugins/beanstalk/encode_test.go
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go
index 0a6cd560..0a6cd560 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/beanstalk/item.go
diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go
index 6bb159ea..6bb159ea 100644
--- a/plugins/jobs/drivers/beanstalk/listen.go
+++ b/plugins/beanstalk/listen.go
diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go
index 529d1474..529d1474 100644
--- a/plugins/jobs/drivers/beanstalk/plugin.go
+++ b/plugins/beanstalk/plugin.go
diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go
index 1f8e6ff1..4a8d6cd9 100644
--- a/plugins/boltdb/boltjobs/listener.go
+++ b/plugins/boltdb/boltjobs/listener.go
@@ -11,7 +11,7 @@ func (c *consumer) listener() {
if err != nil {
panic(err)
}
- //cursor := tx.Cursor()
+ // cursor := tx.Cursor()
err = tx.Commit()
if err != nil {
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 889dc2fa..a2390df5 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,7 +4,6 @@ import (
"fmt"
"sync"
- "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/pubsub"
@@ -16,9 +15,6 @@ const (
PluginName string = "broadcast"
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- redis string = "redis"
- memory string = "memory"
)
type Plugin struct {
@@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error {
}
func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ // TODO(rustatian) channel here?
go func() {
p.Lock()
defer p.Unlock()
@@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
err := p.publishers[j].Publish(m)
if err != nil {
p.log.Error("publishAsync", "error", err)
- // continue publish to other registered publishers
+ // continue publishing to the other registered publishers
continue
}
}
@@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) {
}()
}
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// choose a driver
@@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, key)
- switch val.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[uuid.NewString()] = ps
+ drName := val.(map[string]interface{})[driver]
- return ps, nil
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
}
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ // try local config first
+ if p.cfgPlugin.Has(configKey) {
+ ps, err := p.constructors[drStr].PSConstruct(configKey)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
- return ps, nil
- // then try global if local does not exist
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
+ return ps, nil
+ } else {
+ // try global driver section
+ ps, err := p.constructors[drStr].PSConstruct(drStr)
if err != nil {
return nil, errors.E(op, err)
}
- // if section already exists, return new connection
- if _, ok := p.publishers[configKey]; ok {
- return ps, nil
- }
-
- // if not - initialize a connection
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
p.publishers[configKey] = ps
+
return ps, nil
}
}
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go
index 91b8eda9..91b8eda9 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/ephemeral/consumer.go
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go
index 3298424d..3298424d 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/ephemeral/item.go
diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go
index 28495abb..28495abb 100644
--- a/plugins/jobs/drivers/ephemeral/plugin.go
+++ b/plugins/ephemeral/plugin.go
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 9a19f96c..c6ca96c3 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -16,11 +16,6 @@ const PluginName string = "kv"
const (
// driver is the mandatory field which should present in every storage
driver string = "driver"
-
- memcached string = "memcached"
- boltdb string = "boltdb"
- redis string = "redis"
- memory string = "memory"
)
// Plugin for the unified storage
@@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error { //nolint:gocognit
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
// value - storage
- /*
- For example we can have here 2 storages (but they are not pre-configured)
- for the boltdb and memcached
- We should provide here the actual configs for the all requested storages
- kv:
- boltdb-south:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- boltdb-north:
- driver: boltdb
- dir: "tests/rr-bolt"
- file: "rr.db"
- bucket: "rr"
- permissions: 777
- ttl: 40s
-
- memcached:
- driver: memcached
- addr: [ "127.0.0.1:11211" ]
-
-
- For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
- when user requests for example boltdb-south, we should provide that particular preconfigured storage
- */
+ // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ // when user requests for example boltdb-south, we should provide that particular preconfigured storage
+
for k, v := range p.cfg.Data {
// for example if the key not properly formatted (yaml)
if v == nil {
@@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// config key for the particular sub-driver kv.memcached
configKey := fmt.Sprintf("%s.%s", PluginName, k)
// at this point we know, that driver field present in the configuration
- // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs
- switch v.(map[string]interface{})[driver] {
- case memcached:
- if _, ok := p.constructors[memcached]; !ok {
- p.log.Warn("no memcached constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memcached].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
+ drName := v.(map[string]interface{})[driver]
- case boltdb:
- if _, ok := p.constructors[boltdb]; !ok {
- p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
+ // driver name should be a string
+ if drStr, ok := drName.(string); ok {
+ if _, ok := p.constructors[drStr]; !ok {
+ p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors)
continue
}
- storage, err := p.constructors[boltdb].KVConstruct(configKey)
+ storage, err := p.constructors[drStr].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
- continue
- }
-
- storage, err := p.constructors[memory].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- p.log.Warn("no redis constructors registered", "registered", p.constructors)
- continue
- }
-
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- case p.cfgPlugin.Has(redis):
- storage, err := p.constructors[redis].KVConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the storage
- p.storages[k] = storage
- continue
- default:
- // otherwise - error, no local or global config
- p.log.Warn("no global or local redis configuration provided", "key", configKey)
- continue
- }
-
- default:
- p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
+
+ continue
}
return errCh
diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go
index 6d413790..6d413790 100644
--- a/plugins/kv/drivers/memcached/config.go
+++ b/plugins/memcached/config.go
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go
index e24747fe..e24747fe 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/memcached/driver.go
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go
index 59a2b7cb..59a2b7cb 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/memcached/plugin.go
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go
index 9b2a1ca8..9b2a1ca8 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/sqs/config.go
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go
index 23203190..23203190 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/sqs/consumer.go
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go
index 996adf6c..996adf6c 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/sqs/item.go
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go
index a4280af2..a4280af2 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/sqs/listener.go
diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go
index 54f61ff5..54f61ff5 100644
--- a/plugins/jobs/drivers/sqs/plugin.go
+++ b/plugins/sqs/plugin.go
diff --git a/tests/allocate-failed.php b/tests/allocate-failed.php
new file mode 100644
index 00000000..8514ecc0
--- /dev/null
+++ b/tests/allocate-failed.php
@@ -0,0 +1,18 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require __DIR__ . "/vendor/autoload.php";
+
+if (file_exists('break')) {
+ throw new Exception('oops');
+}
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->waitPayload()){
+ $rr->respond(new \Spiral\RoadRunner\Payload(""));
+}
diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go
index 48d6515d..949698ec 100644
--- a/tests/plugins/jobs/jobs_amqp_test.go
+++ b/tests/plugins/jobs/jobs_amqp_test.go
@@ -14,10 +14,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go
index 8e74c7cc..9f4d37ec 100644
--- a/tests/plugins/jobs/jobs_beanstalk_test.go
+++ b/tests/plugins/jobs/jobs_beanstalk_test.go
@@ -14,10 +14,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
+ "github.com/spiral/roadrunner/v2/plugins/beanstalk"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go
index 98590a96..2890aa9d 100644
--- a/tests/plugins/jobs/jobs_ephemeral_test.go
+++ b/tests/plugins/jobs/jobs_ephemeral_test.go
@@ -15,9 +15,9 @@ import (
goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
jobState "github.com/spiral/roadrunner/v2/pkg/state/job"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go
index f0b5697b..91855ee9 100644
--- a/tests/plugins/jobs/jobs_general_test.go
+++ b/tests/plugins/jobs/jobs_general_test.go
@@ -12,11 +12,11 @@ import (
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/amqp"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral"
"github.com/spiral/roadrunner/v2/plugins/metrics"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index 630a059a..95abe9dc 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -17,11 +17,11 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/sqs"
jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta"
"github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go
index f6521e8d..80fed8eb 100644
--- a/tests/plugins/jobs/jobs_with_toxics_test.go
+++ b/tests/plugins/jobs/jobs_with_toxics_test.go
@@ -11,15 +11,15 @@ import (
toxiproxy "github.com/Shopify/toxiproxy/client"
"github.com/golang/mock/gomock"
endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner/v2/plugins/amqp"
+ "github.com/spiral/roadrunner/v2/plugins/beanstalk"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/informer"
"github.com/spiral/roadrunner/v2/plugins/jobs"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk"
- "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs"
"github.com/spiral/roadrunner/v2/plugins/resetter"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/sqs"
"github.com/spiral/roadrunner/v2/tests/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index a09a456c..e757a9e6 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -15,8 +15,8 @@ import (
"github.com/spiral/roadrunner/v2/plugins/boltdb"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memcached"
"github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"