diff options
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-x | worker_watcher.go | 201 |
1 files changed, 101 insertions, 100 deletions
diff --git a/worker_watcher.go b/worker_watcher.go index 8bc147d0..f8fb67a9 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -11,40 +11,46 @@ import ( ) type Stack struct { - workers []WorkerBase - mutex sync.RWMutex - destroy bool + workers []WorkerBase + mutex sync.RWMutex + destroy bool + actualNumOfWorkers int64 } func NewWorkersStack() *Stack { + w := runtime.NumCPU() return &Stack{ - workers: make([]WorkerBase, 0, runtime.NumCPU()), + workers: make([]WorkerBase, 0, w), + actualNumOfWorkers: 0, } } func (stack *Stack) Reset() { stack.mutex.Lock() defer stack.mutex.Unlock() - + stack.actualNumOfWorkers = 0 stack.workers = nil } +// Push worker back to the stack +// If stack in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w WorkerBase) { stack.mutex.Lock() defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { stack.mutex.Lock() defer stack.mutex.Unlock() - return len(stack.workers) == 0 } func (stack *Stack) Pop() (WorkerBase, bool) { stack.mutex.Lock() defer stack.mutex.Unlock() + // do not release new stack if stack.destroy { return nil, true @@ -54,12 +60,83 @@ func (stack *Stack) Pop() (WorkerBase, bool) { return nil, false } + // move worker w := stack.workers[len(stack.workers)-1] stack.workers = stack.workers[:len(stack.workers)-1] - + stack.actualNumOfWorkers-- return w, false } +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the stack +func (stack *Stack) Workers() []WorkerBase { + stack.mutex.Lock() + defer stack.mutex.Unlock() + workersCopy := make([]WorkerBase, 0, 1) + // copy + for _, v := range stack.workers { + sw := v.(SyncWorker) + workersCopy = append(workersCopy, sw) + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if len(stack.workers) != int(stack.actualNumOfWorkers) { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(StateDestroyed) + } + stack.mutex.Unlock() + tt.Stop() + // clear + stack.Reset() + return + } + } +} + type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(workers []WorkerBase) error @@ -151,7 +228,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) if w == nil { continue } - ww.ReduceWorkersCount() return w, nil case <-ctx.Done(): return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) @@ -159,7 +235,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } } - ww.ReduceWorkersCount() return w, nil } @@ -184,91 +259,41 @@ func (ww *workerWatcher) AllocateNew() error { } func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { - ww.stack.mutex.Lock() + ww.mutex.Lock() + defer ww.mutex.Unlock() + const op = errors.Op("remove worker") - defer ww.stack.mutex.Unlock() pid := wb.Pid() - for i := 0; i < len(ww.stack.workers); i++ { - if ww.stack.workers[i].Pid() == pid { - // found in the stack - // remove worker - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - - wb.State().Set(StateInvalid) - err := wb.Kill() - if err != nil { - return errors.E(op, err) - } - break + + if ww.stack.FindAndRemoveByPid(pid) { + wb.State().Set(StateInvalid) + err := wb.Kill() + if err != nil { + return errors.E(op, err) } + return nil } - // worker currently handle request, set state Remove + wb.State().Set(StateRemove) return nil } // O(1) operation func (ww *workerWatcher) PushWorker(w WorkerBase) { - ww.IncreaseWorkersCount() - ww.stack.Push(w) -} - -func (ww *workerWatcher) ReduceWorkersCount() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} -func (ww *workerWatcher) IncreaseWorkersCount() { ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() + defer ww.mutex.Unlock() + ww.stack.Push(w) } // Destroy all underlying stack (but let them to complete the task) func (ww *workerWatcher) Destroy(ctx context.Context) { - ww.stack.mutex.Lock() - ww.stack.destroy = true - ww.stack.mutex.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-tt.C: - ww.stack.mutex.Lock() - if len(ww.stack.workers) != int(ww.actualNumWorkers) { - ww.stack.mutex.Unlock() - continue - } - ww.stack.mutex.Unlock() - // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack - // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - ww.stack.workers[i].State().Set(StateDestroyed) - } - ww.stack.mutex.Unlock() - tt.Stop() - // clear - ww.stack.Reset() - return - } - } + // destroy stack, we don't use ww mutex here, since we should be able to push worker + ww.stack.Destroy(ctx) } // Warning, this is O(n) operation, and it will return copy of the actual workers func (ww *workerWatcher) WorkersList() []WorkerBase { - ww.stack.mutex.Lock() - defer ww.stack.mutex.Unlock() - workersCopy := make([]WorkerBase, 0, 1) - for _, v := range ww.stack.workers { - sw := v.(SyncWorker) - workersCopy = append(workersCopy, sw) - } - - return workersCopy + return ww.stack.Workers() } func (ww *workerWatcher) wait(w WorkerBase) { @@ -287,37 +312,13 @@ func (ww *workerWatcher) wait(w WorkerBase) { return } - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.ReduceWorkersCount() - ww.stack.mutex.Unlock() - - err = ww.AllocateNew() - if err != nil { - ww.events.Push(PoolEvent{ - Event: EventPoolError, - Payload: errors.E(op, err), - }) - } - - return - } - } - - ww.stack.mutex.Unlock() - - // worker not in the stack (not returned), forget and allocate new + _ = ww.stack.FindAndRemoveByPid(w.Pid()) err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, Payload: errors.E(op, err), }) - return } } |