diff options
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-x | worker_watcher.go | 59 |
1 files changed, 32 insertions, 27 deletions
diff --git a/worker_watcher.go b/worker_watcher.go index 5ae54024..f49eeacf 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -2,14 +2,14 @@ package roadrunner import ( "context" - "errors" "sync" "time" + "github.com/spiral/roadrunner/v2/errors" "github.com/spiral/roadrunner/v2/util" ) -var ErrWatcherStopped = errors.New("watcher stopped") +//var = errors.New("watcher stopped") type Stack struct { workers []WorkerBase @@ -85,11 +85,7 @@ type WorkerWatcher interface { } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func newWorkerWatcher( - allocator func(args ...interface{}) (WorkerBase, error), - numWorkers int64, - events *util.EventHandler, -) *workerWatcher { +func newWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events *util.EventHandler) *workerWatcher { ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, @@ -127,10 +123,11 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e } func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { + const op = errors.Op("get_free_worker") // thread safe operation w, stop := ww.stack.Pop() if stop { - return nil, ErrWatcherStopped + return nil, errors.E(op, errors.ErrWatcherStopped) } // handle worker remove state @@ -146,6 +143,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // no free stack if w == nil { + // TODO allocate timeout tout := time.NewTicker(time.Second * 180) defer tout.Stop() for { @@ -153,20 +151,20 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) default: w, stop = ww.stack.Pop() if stop { - return nil, ErrWatcherStopped + return nil, errors.E(op, errors.ErrWatcherStopped) } if w == nil { continue } - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() return w, nil case <-tout.C: - return nil, errors.New("no free stack") + return nil, errors.Str("no free stack") } } } - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() return w, nil } @@ -198,10 +196,10 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error // found in the stack // remove worker ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() wb.State().Set(StateInvalid) - err := wb.Kill(ctx) + err := wb.Kill() if err != nil { return err } @@ -215,14 +213,19 @@ func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error // O(1) operation func (ww *workerWatcher) PushWorker(w WorkerBase) { - ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() + ww.IncreaseWorkersCount() ww.stack.Push(w) } func (ww *workerWatcher) ReduceWorkersCount() { - ww.decreaseNumOfActualWorkers() + ww.mutex.Lock() + ww.actualNumWorkers-- + ww.mutex.Unlock() +} +func (ww *workerWatcher) IncreaseWorkersCount() { + ww.mutex.Lock() + ww.actualNumWorkers++ + ww.mutex.Unlock() } // Destroy all underlying stack (but let them to complete the task) @@ -258,9 +261,17 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } } -// Warning, this is O(n) operation +// Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) WorkersList() []WorkerBase { - return ww.stack.workers + ww.stack.mutex.Lock() + defer ww.stack.mutex.Unlock() + workersCopy := make([]WorkerBase, 0, 1) + for _, v := range ww.stack.workers { + sw := v.(SyncWorker) + workersCopy = append(workersCopy, sw) + } + + return workersCopy } func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { @@ -284,7 +295,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { // worker in the stack, reallocating if ww.stack.workers[i].Pid() == pid { ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() + ww.ReduceWorkersCount() ww.stack.mutex.Unlock() err = ww.AllocateNew(ctx) @@ -321,9 +332,3 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.wait(context.Background(), wb) }() } - -func (ww *workerWatcher) decreaseNumOfActualWorkers() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} |