diff options
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 50 | ||||
-rw-r--r-- | pkg/worker_watcher/container/channel/vec.go | 5 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 78 |
4 files changed, 116 insertions, 19 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 3eb0714f..720ca9da 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg // set up workers allocator p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) // set up workers watcher - p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) + p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout) // allocate requested number of workers workers, err := p.allocateWorkers(p.cfg.NumWorkers) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index d1b24574..14df513e 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -2,6 +2,7 @@ package pool import ( "context" + "os" "os/exec" "testing" "time" @@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { <-block p.Destroy(context.Background()) } + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go index 7fb65a92..5605f1e0 100644 --- a/pkg/worker_watcher/container/channel/vec.go +++ b/pkg/worker_watcher/container/channel/vec.go @@ -15,14 +15,11 @@ type Vec struct { destroy uint64 // channel with the workers workers chan worker.BaseProcess - - len uint64 } func NewVector(len uint64) *Vec { vec := &Vec{ destroy: 0, - len: len, workers: make(chan worker.BaseProcess, len), } @@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) { 1. TTL is set with no requests during the TTL 2. Violated Get <-> Release operation (how ??) */ - for i := uint64(0); i < v.len; i++ { + for i := 0; i < len(v.workers); i++ { /* We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states. */ diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348be199..bdd91423 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck import ( "context" "sync" + "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" + "github.com/spiral/roadrunner/v2/utils" ) // Vector interface represents vector container @@ -30,21 +32,24 @@ type workerWatcher struct { sync.RWMutex container Vector // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers *uint64 workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - numWorkers: numWorkers, + container: channel.NewVector(numWorkers), - workers: make([]worker.BaseProcess, 0, numWorkers), + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, events: events, @@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { } func (ww *workerWatcher) Allocate() error { - ww.Lock() const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() if err != nil { - return errors.E(op, errors.WorkerAllocate, err) + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + default: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + goto done + } + } } +done: // add worker to Wait ww.addToWatch(sw) + ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) - - // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Release(sw) return nil @@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { for i := 0; i < len(ww.workers); i++ { if ww.workers[i].Pid() == pid { ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker + // kill worker, just to be sure it's dead _ = wb.Kill() return } @@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } -// Destroy all underlying container (but let them to complete the task) +// Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() @@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { case <-tt.C: ww.Lock() // that might be one of the workers is working - if ww.numWorkers != uint64(len(ww.workers)) { + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { ww.Unlock() continue } @@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() + if len(ww.workers) == 0 { + return nil + } + base := make([]worker.BaseProcess, 0, len(ww.workers)) for i := 0; i < len(ww.workers); i++ { base = append(base, ww.workers[i]) @@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { Event: events.EventPoolError, Payload: errors.E(op, err), }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } } } |