diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 78 |
1 files changed, 64 insertions, 14 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348be199..bdd91423 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck import ( "context" "sync" + "sync/atomic" "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel" + "github.com/spiral/roadrunner/v2/utils" ) // Vector interface represents vector container @@ -30,21 +32,24 @@ type workerWatcher struct { sync.RWMutex container Vector // used to control Destroy stage (that all workers are in the container) - numWorkers uint64 + numWorkers *uint64 workers []worker.BaseProcess - allocator worker.Allocator - events events.Handler + allocator worker.Allocator + allocateTimeout time.Duration + events events.Handler } // NewSyncWorkerWatcher is a constructor for the Watcher -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher { ww := &workerWatcher{ - container: channel.NewVector(numWorkers), - numWorkers: numWorkers, + container: channel.NewVector(numWorkers), - workers: make([]worker.BaseProcess, 0, numWorkers), + // pass a ptr to the number of workers to avoid blocking in the TTL loop + numWorkers: utils.Uint64(numWorkers), + allocateTimeout: allocateTimeout, + workers: make([]worker.BaseProcess, 0, numWorkers), allocator: allocator, events: events, @@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) { } func (ww *workerWatcher) Allocate() error { - ww.Lock() const op = errors.Op("worker_watcher_allocate_new") + sw, err := ww.allocator() if err != nil { - return errors.E(op, errors.WorkerAllocate, err) + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)), + }) + + // if no timeout, return error immediately + if ww.allocateTimeout == 0 { + return errors.E(op, errors.WorkerAllocate, err) + } + + tt := time.After(ww.allocateTimeout) + for { + select { + case <-tt: + // reduce number of workers + atomic.AddUint64(ww.numWorkers, ^uint64(0)) + // timeout exceed, worker can't be allocated + return errors.E(op, errors.WorkerAllocate, err) + default: + sw, err = ww.allocator() + if err != nil { + // log incident + ww.events.Push( + events.WorkerEvent{ + Event: events.EventWorkerError, + Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)), + }) + continue + } + + // reallocated + goto done + } + } } +done: // add worker to Wait ww.addToWatch(sw) + ww.Lock() // add new worker to the workers slice (to get information about workers in parallel) ww.workers = append(ww.workers, sw) - - // unlock Allocate mutex ww.Unlock() + // push the worker to the container ww.Release(sw) return nil @@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { for i := 0; i < len(ww.workers); i++ { if ww.workers[i].Pid() == pid { ww.workers = append(ww.workers[:i], ww.workers[i+1:]...) - // kill worker + // kill worker, just to be sure it's dead _ = wb.Kill() return } @@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) { } } -// Destroy all underlying container (but let them to complete the task) +// Destroy all underlying container (but let them complete the task) func (ww *workerWatcher) Destroy(_ context.Context) { // destroy container, we don't use ww mutex here, since we should be able to push worker ww.Lock() @@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) { case <-tt.C: ww.Lock() // that might be one of the workers is working - if ww.numWorkers != uint64(len(ww.workers)) { + if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) { ww.Unlock() continue } @@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess { ww.RLock() defer ww.RUnlock() + if len(ww.workers) == 0 { + return nil + } + base := make([]worker.BaseProcess, 0, len(ww.workers)) for i := 0; i < len(ww.workers); i++ { base = append(base, ww.workers[i]) @@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { Event: events.EventPoolError, Payload: errors.E(op, err), }) + + // no workers at all, panic + if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 { + panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err))) + } } } |