diff options
author | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-26 18:32:51 +0300 |
commit | efb3efa98c8555815330274f0618bfc080f4c65c (patch) | |
tree | b3bcabdb22fade6ef06d865d60995bc15f84cf1c | |
parent | 3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff) |
Move drivers to the plugin's root.
Fix #771, add tests.
Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 50 | ||||
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 5 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 78 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/config.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/config.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/consumer.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/item.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/item.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/listener.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/listener.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go (renamed from plugins/jobs/drivers/amqp/amqpjobs/redial.go) | 0 | ||||
-rw-r--r-- | plugins/amqp/plugin.go (renamed from plugins/jobs/drivers/amqp/plugin.go) | 2 | ||||
-rw-r--r-- | plugins/beanstalk/config.go (renamed from plugins/jobs/drivers/beanstalk/config.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/connection.go (renamed from plugins/jobs/drivers/beanstalk/connection.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/consumer.go (renamed from plugins/jobs/drivers/beanstalk/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/encode_test.go (renamed from plugins/jobs/drivers/beanstalk/encode_test.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/item.go (renamed from plugins/jobs/drivers/beanstalk/item.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/listen.go (renamed from plugins/jobs/drivers/beanstalk/listen.go) | 0 | ||||
-rw-r--r-- | plugins/beanstalk/plugin.go (renamed from plugins/jobs/drivers/beanstalk/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/boltdb/boltjobs/listener.go | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 63 | ||||
-rw-r--r-- | plugins/ephemeral/consumer.go (renamed from plugins/jobs/drivers/ephemeral/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/ephemeral/item.go (renamed from plugins/jobs/drivers/ephemeral/item.go) | 0 | ||||
-rw-r--r-- | plugins/ephemeral/plugin.go (renamed from plugins/jobs/drivers/ephemeral/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 116 | ||||
-rw-r--r-- | plugins/memcached/config.go (renamed from plugins/kv/drivers/memcached/config.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/driver.go (renamed from plugins/kv/drivers/memcached/driver.go) | 0 | ||||
-rw-r--r-- | plugins/memcached/plugin.go (renamed from plugins/kv/drivers/memcached/plugin.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/config.go (renamed from plugins/jobs/drivers/sqs/config.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/consumer.go (renamed from plugins/jobs/drivers/sqs/consumer.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/item.go (renamed from plugins/jobs/drivers/sqs/item.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/listener.go (renamed from plugins/jobs/drivers/sqs/listener.go) | 0 | ||||
-rw-r--r-- | plugins/sqs/plugin.go (renamed from plugins/jobs/drivers/sqs/plugin.go) | 0 | ||||
-rw-r--r-- | tests/allocate-failed.php | 18 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_amqp_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_beanstalk_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_ephemeral_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_general_test.go | 4 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 2 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_with_toxics_test.go | 6 | ||||
-rw-r--r-- | tests/plugins/kv/storage_plugin_test.go | 2 |
40 files changed, 178 insertions, 178 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 3eb0714f..720ca9da 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index d1b24574..14df513e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -2,6 +2,7 @@ package pool import ( "context" + "os" "os/exec" "testing" "time" @@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { <-block p.Destroy(context.Background()) } + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 7fb65a92..5605f1e0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -15,14 +15,11 @@ type Vec struct { destroy uint64 // channel with the workers workers chan worker.BaseProcess - - len uint64 } func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - len: len, workers: make(chan worker.BaseProcess, len), } @@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) { 1. TTL is set with no requests during the TTL 2. Violated Get <-> Release operation (how ??) */ - for i := uint64(0); i < v.len; i++ { + for i := 0; i < len(v.workers); i++ { /* We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. */ diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348be199..bdd91423 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck import ( "context" "sync" + "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" + "github.com/spiral/roadrunner/v2/utils" ) // Vector interface represents vector container @@ -30,21 +32,24 @@ type workerWatcher struct { sync.RWMutex container Vector // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers *uint64 workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - numWorkers: numWorkers, + container: channel.NewVector(numWorkers), - workers: make([]worker.BaseProcess, 0, numWorkers), + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, events: events, @@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { } func (ww *workerWatcher) Allocate() error { - ww.Lock() const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() if err != nil { - return errors.E(op, errors.WorkerAllocate, err) + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + default: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + goto done + } + } } +done: // add worker to Wait ww.addToWatch(sw) + ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) - - // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Release(sw) return nil @@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { for i := 0; i < len(ww.workers); i++ { if ww.workers[i].Pid() == pid { ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker + // kill worker, just to be sure it's dead _ = wb.Kill() return } @@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } -// Destroy all underlying container (but let them to complete the task) +// Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() @@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { case <-tt.C: ww.Lock() // that might be one of the workers is working - if ww.numWorkers != uint64(len(ww.workers)) { + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { ww.Unlock() continue } @@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() + if len(ww.workers) == 0 { + return nil + } + base := make([]worker.BaseProcess, 0, len(ww.workers)) for i := 0; i < len(ww.workers); i++ { base = append(base, ww.workers[i]) @@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { Event: events.EventPoolError, Payload: errors.E(op, err), }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } } } diff --git a/plugins/jobs/drivers/amqp/amqpjobs/config.go b/plugins/amqp/amqpjobs/config.go index ac2f6e53..ac2f6e53 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/config.go +++ b/plugins/amqp/amqpjobs/config.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go b/plugins/amqp/amqpjobs/consumer.go index 1931ceaa..1931ceaa 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/consumer.go +++ b/plugins/amqp/amqpjobs/consumer.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index a8e305ea..a8e305ea 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/listener.go b/plugins/amqp/amqpjobs/listener.go index 0156d55c..0156d55c 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/listener.go +++ b/plugins/amqp/amqpjobs/listener.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go index e260fabe..e260fabe 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go +++ b/plugins/amqp/amqpjobs/rabbit_init.go diff --git a/plugins/jobs/drivers/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 0835e3ea..0835e3ea 100644 --- a/plugins/jobs/drivers/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go diff --git a/plugins/jobs/drivers/amqp/plugin.go b/plugins/amqp/plugin.go index 8797d20b..c4f5f1da 100644 --- a/plugins/jobs/drivers/amqp/plugin.go +++ b/plugins/amqp/plugin.go @@ -4,8 +4,8 @@ import ( "github.com/spiral/roadrunner/v2/common/jobs" "github.com/spiral/roadrunner/v2/pkg/events" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" + "github.com/spiral/roadrunner/v2/plugins/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp/amqpjobs" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" ) diff --git a/plugins/jobs/drivers/beanstalk/config.go b/plugins/beanstalk/config.go index a8069f5d..a8069f5d 100644 --- a/plugins/jobs/drivers/beanstalk/config.go +++ b/plugins/beanstalk/config.go diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/beanstalk/connection.go index d3241b37..d3241b37 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/beanstalk/connection.go diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/beanstalk/consumer.go index 5ef89983..5ef89983 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/beanstalk/consumer.go diff --git a/plugins/jobs/drivers/beanstalk/encode_test.go b/plugins/beanstalk/encode_test.go index e43207eb..e43207eb 100644 --- a/plugins/jobs/drivers/beanstalk/encode_test.go +++ b/plugins/beanstalk/encode_test.go diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/beanstalk/item.go index 0a6cd560..0a6cd560 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/beanstalk/item.go diff --git a/plugins/jobs/drivers/beanstalk/listen.go b/plugins/beanstalk/listen.go index 6bb159ea..6bb159ea 100644 --- a/plugins/jobs/drivers/beanstalk/listen.go +++ b/plugins/beanstalk/listen.go diff --git a/plugins/jobs/drivers/beanstalk/plugin.go b/plugins/beanstalk/plugin.go index 529d1474..529d1474 100644 --- a/plugins/jobs/drivers/beanstalk/plugin.go +++ b/plugins/beanstalk/plugin.go diff --git a/plugins/boltdb/boltjobs/listener.go b/plugins/boltdb/boltjobs/listener.go index 1f8e6ff1..4a8d6cd9 100644 --- a/plugins/boltdb/boltjobs/listener.go +++ b/plugins/boltdb/boltjobs/listener.go @@ -11,7 +11,7 @@ func (c *consumer) listener() { if err != nil { panic(err) } - //cursor := tx.Cursor() + // cursor := tx.Cursor() err = tx.Commit() if err != nil { diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 889dc2fa..a2390df5 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/pubsub" @@ -16,9 +15,6 @@ const ( PluginName string = "broadcast" // driver is the mandatory field which should present in every storage driver string = "driver" - - redis string = "redis" - memory string = "memory" ) type Plugin struct { @@ -97,6 +93,7 @@ func (p *Plugin) Publish(m *pubsub.Message) error { } func (p *Plugin) PublishAsync(m *pubsub.Message) { + // TODO(rustatian) channel here? go func() { p.Lock() defer p.Unlock() @@ -106,7 +103,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { err := p.publishers[j].Publish(m) if err != nil { p.log.Error("publishAsync", "error", err) - // continue publish to other registered publishers + // continue publishing to the other registered publishers continue } } @@ -116,7 +113,7 @@ func (p *Plugin) PublishAsync(m *pubsub.Message) { }() } -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // choose a driver @@ -136,57 +133,37 @@ func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:goco // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, key) - switch val.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[uuid.NewString()] = ps + drName := val.(map[string]interface{})[driver] - return ps, nil - case redis: - if _, ok := p.constructors[redis]; !ok { - return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) } - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) + // try local config first + if p.cfgPlugin.Has(configKey) { + ps, err := p.constructors[drStr].PSConstruct(configKey) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps - return ps, nil - // then try global if local does not exist - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) + return ps, nil + } else { + // try global driver section + ps, err := p.constructors[drStr].PSConstruct(drStr) if err != nil { return nil, errors.E(op, err) } - // if section already exists, return new connection - if _, ok := p.publishers[configKey]; ok { - return ps, nil - } - - // if not - initialize a connection + // save the initialized publisher channel + // for the in-memory, register new publishers p.publishers[configKey] = ps + return ps, nil } } diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/ephemeral/consumer.go index 91b8eda9..91b8eda9 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/ephemeral/consumer.go diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/ephemeral/item.go index 3298424d..3298424d 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/ephemeral/item.go diff --git a/plugins/jobs/drivers/ephemeral/plugin.go b/plugins/ephemeral/plugin.go index 28495abb..28495abb 100644 --- a/plugins/jobs/drivers/ephemeral/plugin.go +++ b/plugins/ephemeral/plugin.go diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 9a19f96c..c6ca96c3 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -16,11 +16,6 @@ const PluginName string = "kv" const ( // driver is the mandatory field which should present in every storage driver string = "driver" - - memcached string = "memcached" - boltdb string = "boltdb" - redis string = "redis" - memory string = "memory" ) // Plugin for the unified storage @@ -52,40 +47,14 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { return nil } -func (p *Plugin) Serve() chan error { //nolint:gocognit +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("kv_plugin_serve") // key - storage name in the config // value - storage - /* - For example we can have here 2 storages (but they are not pre-configured) - for the boltdb and memcached - We should provide here the actual configs for the all requested storages - kv: - boltdb-south: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - boltdb-north: - driver: boltdb - dir: "tests/rr-bolt" - file: "rr.db" - bucket: "rr" - permissions: 777 - ttl: 40s - - memcached: - driver: memcached - addr: [ "127.0.0.1:11211" ] - - - For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached - when user requests for example boltdb-south, we should provide that particular preconfigured storage - */ + // For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + // when user requests for example boltdb-south, we should provide that particular preconfigured storage + for k, v := range p.cfg.Data { // for example if the key not properly formatted (yaml) if v == nil { @@ -109,30 +78,16 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // config key for the particular sub-driver kv.memcached configKey := fmt.Sprintf("%s.%s", PluginName, k) // at this point we know, that driver field present in the configuration - // TODO(rustatian): refactor, made generic, with checks like in the broadcast, websockets or jobs - switch v.(map[string]interface{})[driver] { - case memcached: - if _, ok := p.constructors[memcached]; !ok { - p.log.Warn("no memcached constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memcached].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage + drName := v.(map[string]interface{})[driver] - case boltdb: - if _, ok := p.constructors[boltdb]; !ok { - p.log.Warn("no boltdb constructors registered", "registered", p.constructors) + // driver name should be a string + if drStr, ok := drName.(string); ok { + if _, ok := p.constructors[drStr]; !ok { + p.log.Warn("no constructors registered", "requested constructor", drStr, "registered", p.constructors) continue } - storage, err := p.constructors[boltdb].KVConstruct(configKey) + storage, err := p.constructors[drStr].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -140,56 +95,9 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage - case memory: - if _, ok := p.constructors[memory]; !ok { - p.log.Warn("no in-memory constructors registered", "registered", p.constructors) - continue - } - - storage, err := p.constructors[memory].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case redis: - if _, ok := p.constructors[redis]; !ok { - p.log.Warn("no redis constructors registered", "registered", p.constructors) - continue - } - - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - case p.cfgPlugin.Has(redis): - storage, err := p.constructors[redis].KVConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the storage - p.storages[k] = storage - continue - default: - // otherwise - error, no local or global config - p.log.Warn("no global or local redis configuration provided", "key", configKey) - continue - } - - default: - p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver]))) } + + continue } return errCh diff --git a/plugins/kv/drivers/memcached/config.go b/plugins/memcached/config.go index 6d413790..6d413790 100644 --- a/plugins/kv/drivers/memcached/config.go +++ b/plugins/memcached/config.go diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/memcached/driver.go index e24747fe..e24747fe 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/memcached/driver.go diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/memcached/plugin.go index 59a2b7cb..59a2b7cb 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/memcached/plugin.go diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/sqs/config.go index 9b2a1ca8..9b2a1ca8 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/sqs/config.go diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/sqs/consumer.go index 23203190..23203190 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/sqs/consumer.go diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/sqs/item.go index 996adf6c..996adf6c 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/sqs/item.go diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/sqs/listener.go index a4280af2..a4280af2 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/sqs/listener.go diff --git a/plugins/jobs/drivers/sqs/plugin.go b/plugins/sqs/plugin.go index 54f61ff5..54f61ff5 100644 --- a/plugins/jobs/drivers/sqs/plugin.go +++ b/plugins/sqs/plugin.go diff --git a/tests/allocate-failed.php b/tests/allocate-failed.php new file mode 100644 index 00000000..8514ecc0 --- /dev/null +++ b/tests/allocate-failed.php @@ -0,0 +1,18 @@ +<?php + +declare(strict_types=1); + +use Spiral\Goridge\StreamRelay; +use Spiral\RoadRunner\Worker as RoadRunner; + +require __DIR__ . "/vendor/autoload.php"; + +if (file_exists('break')) { + throw new Exception('oops'); +} + +$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); + +while($rr->waitPayload()){ + $rr->respond(new \Spiral\RoadRunner\Payload("")); +} diff --git a/tests/plugins/jobs/jobs_amqp_test.go b/tests/plugins/jobs/jobs_amqp_test.go index 48d6515d..949698ec 100644 --- a/tests/plugins/jobs/jobs_amqp_test.go +++ b/tests/plugins/jobs/jobs_amqp_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_beanstalk_test.go b/tests/plugins/jobs/jobs_beanstalk_test.go index 8e74c7cc..9f4d37ec 100644 --- a/tests/plugins/jobs/jobs_beanstalk_test.go +++ b/tests/plugins/jobs/jobs_beanstalk_test.go @@ -14,10 +14,10 @@ import ( endure "github.com/spiral/endure/pkg/container" goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_ephemeral_test.go b/tests/plugins/jobs/jobs_ephemeral_test.go index 98590a96..2890aa9d 100644 --- a/tests/plugins/jobs/jobs_ephemeral_test.go +++ b/tests/plugins/jobs/jobs_ephemeral_test.go @@ -15,9 +15,9 @@ import ( goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" jobState "github.com/spiral/roadrunner/v2/pkg/state/job" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" diff --git a/tests/plugins/jobs/jobs_general_test.go b/tests/plugins/jobs/jobs_general_test.go index f0b5697b..91855ee9 100644 --- a/tests/plugins/jobs/jobs_general_test.go +++ b/tests/plugins/jobs/jobs_general_test.go @@ -12,11 +12,11 @@ import ( "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/ephemeral" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/ephemeral" "github.com/spiral/roadrunner/v2/plugins/metrics" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 630a059a..95abe9dc 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -17,11 +17,11 @@ import ( "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" jobsv1beta "github.com/spiral/roadrunner/v2/proto/jobs/v1beta" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" diff --git a/tests/plugins/jobs/jobs_with_toxics_test.go b/tests/plugins/jobs/jobs_with_toxics_test.go index f6521e8d..80fed8eb 100644 --- a/tests/plugins/jobs/jobs_with_toxics_test.go +++ b/tests/plugins/jobs/jobs_with_toxics_test.go @@ -11,15 +11,15 @@ import ( toxiproxy "github.com/Shopify/toxiproxy/client" "github.com/golang/mock/gomock" endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner/v2/plugins/amqp" + "github.com/spiral/roadrunner/v2/plugins/beanstalk" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/informer" "github.com/spiral/roadrunner/v2/plugins/jobs" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/amqp" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/beanstalk" - "github.com/spiral/roadrunner/v2/plugins/jobs/drivers/sqs" "github.com/spiral/roadrunner/v2/plugins/resetter" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/sqs" "github.com/spiral/roadrunner/v2/tests/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go index a09a456c..e757a9e6 100644 --- a/tests/plugins/kv/storage_plugin_test.go +++ b/tests/plugins/kv/storage_plugin_test.go @@ -15,8 +15,8 @@ import ( "github.com/spiral/roadrunner/v2/plugins/boltdb" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" - "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memcached" "github.com/spiral/roadrunner/v2/plugins/memory" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" |