summaryrefslogtreecommitdiff
path: root/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_watcher.go')
-rwxr-xr-xworker_watcher.go201
1 files changed, 101 insertions, 100 deletions
diff --git a/worker_watcher.go b/worker_watcher.go
index 8bc147d0..f8fb67a9 100755
--- a/worker_watcher.go
+++ b/worker_watcher.go
@@ -11,40 +11,46 @@ import (
)
type Stack struct {
- workers []WorkerBase
- mutex sync.RWMutex
- destroy bool
+ workers []WorkerBase
+ mutex sync.RWMutex
+ destroy bool
+ actualNumOfWorkers int64
}
func NewWorkersStack() *Stack {
+ w := runtime.NumCPU()
return &Stack{
- workers: make([]WorkerBase, 0, runtime.NumCPU()),
+ workers: make([]WorkerBase, 0, w),
+ actualNumOfWorkers: 0,
}
}
func (stack *Stack) Reset() {
stack.mutex.Lock()
defer stack.mutex.Unlock()
-
+ stack.actualNumOfWorkers = 0
stack.workers = nil
}
+// Push worker back to the stack
+// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w WorkerBase) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
+ stack.actualNumOfWorkers++
stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
stack.mutex.Lock()
defer stack.mutex.Unlock()
-
return len(stack.workers) == 0
}
func (stack *Stack) Pop() (WorkerBase, bool) {
stack.mutex.Lock()
defer stack.mutex.Unlock()
+
// do not release new stack
if stack.destroy {
return nil, true
@@ -54,12 +60,83 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
return nil, false
}
+ // move worker
w := stack.workers[len(stack.workers)-1]
stack.workers = stack.workers[:len(stack.workers)-1]
-
+ stack.actualNumOfWorkers--
return w, false
}
+func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ for i := 0; i < len(stack.workers); i++ {
+ // worker in the stack, reallocating
+ if stack.workers[i].Pid() == pid {
+ stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
+ stack.actualNumOfWorkers--
+ // worker found and removed
+ return true
+ }
+ }
+ // no worker with such ID
+ return false
+}
+
+// Workers return copy of the workers in the stack
+func (stack *Stack) Workers() []WorkerBase {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ workersCopy := make([]WorkerBase, 0, 1)
+ // copy
+ for _, v := range stack.workers {
+ sw := v.(SyncWorker)
+ workersCopy = append(workersCopy, sw)
+ }
+
+ return workersCopy
+}
+
+func (stack *Stack) isDestroying() bool {
+ stack.mutex.Lock()
+ defer stack.mutex.Unlock()
+ return stack.destroy
+}
+
+// we also have to give a chance to pool to Push worker (return it)
+func (stack *Stack) Destroy(ctx context.Context) {
+ stack.mutex.Lock()
+ stack.destroy = true
+ stack.mutex.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ for {
+ select {
+ case <-tt.C:
+ stack.mutex.Lock()
+ // that might be one of the workers is working
+ if len(stack.workers) != int(stack.actualNumOfWorkers) {
+ stack.mutex.Unlock()
+ continue
+ }
+ stack.mutex.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All stack at this moment are in the stack
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ stack.mutex.Lock()
+ for i := 0; i < len(stack.workers); i++ {
+ // set state for the stack in the stack (unused at the moment)
+ stack.workers[i].State().Set(StateDestroyed)
+ }
+ stack.mutex.Unlock()
+ tt.Stop()
+ // clear
+ stack.Reset()
+ return
+ }
+ }
+}
+
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
AddToWatch(workers []WorkerBase) error
@@ -151,7 +228,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
if w == nil {
continue
}
- ww.ReduceWorkersCount()
return w, nil
case <-ctx.Done():
return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
@@ -159,7 +235,6 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
}
- ww.ReduceWorkersCount()
return w, nil
}
@@ -184,91 +259,41 @@ func (ww *workerWatcher) AllocateNew() error {
}
func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error {
- ww.stack.mutex.Lock()
+ ww.mutex.Lock()
+ defer ww.mutex.Unlock()
+
const op = errors.Op("remove worker")
- defer ww.stack.mutex.Unlock()
pid := wb.Pid()
- for i := 0; i < len(ww.stack.workers); i++ {
- if ww.stack.workers[i].Pid() == pid {
- // found in the stack
- // remove worker
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
-
- wb.State().Set(StateInvalid)
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
- }
- break
+
+ if ww.stack.FindAndRemoveByPid(pid) {
+ wb.State().Set(StateInvalid)
+ err := wb.Kill()
+ if err != nil {
+ return errors.E(op, err)
}
+ return nil
}
- // worker currently handle request, set state Remove
+
wb.State().Set(StateRemove)
return nil
}
// O(1) operation
func (ww *workerWatcher) PushWorker(w WorkerBase) {
- ww.IncreaseWorkersCount()
- ww.stack.Push(w)
-}
-
-func (ww *workerWatcher) ReduceWorkersCount() {
- ww.mutex.Lock()
- ww.actualNumWorkers--
- ww.mutex.Unlock()
-}
-func (ww *workerWatcher) IncreaseWorkersCount() {
ww.mutex.Lock()
- ww.actualNumWorkers++
- ww.mutex.Unlock()
+ defer ww.mutex.Unlock()
+ ww.stack.Push(w)
}
// Destroy all underlying stack (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- ww.stack.mutex.Lock()
- ww.stack.destroy = true
- ww.stack.mutex.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 100)
- for {
- select {
- case <-tt.C:
- ww.stack.mutex.Lock()
- if len(ww.stack.workers) != int(ww.actualNumWorkers) {
- ww.stack.mutex.Unlock()
- continue
- }
- ww.stack.mutex.Unlock()
- // unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
- ww.stack.workers[i].State().Set(StateDestroyed)
- }
- ww.stack.mutex.Unlock()
- tt.Stop()
- // clear
- ww.stack.Reset()
- return
- }
- }
+ // destroy stack, we don't use ww mutex here, since we should be able to push worker
+ ww.stack.Destroy(ctx)
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) WorkersList() []WorkerBase {
- ww.stack.mutex.Lock()
- defer ww.stack.mutex.Unlock()
- workersCopy := make([]WorkerBase, 0, 1)
- for _, v := range ww.stack.workers {
- sw := v.(SyncWorker)
- workersCopy = append(workersCopy, sw)
- }
-
- return workersCopy
+ return ww.stack.Workers()
}
func (ww *workerWatcher) wait(w WorkerBase) {
@@ -287,37 +312,13 @@ func (ww *workerWatcher) wait(w WorkerBase) {
return
}
- pid := w.Pid()
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // worker in the stack, reallocating
- if ww.stack.workers[i].Pid() == pid {
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.ReduceWorkersCount()
- ww.stack.mutex.Unlock()
-
- err = ww.AllocateNew()
- if err != nil {
- ww.events.Push(PoolEvent{
- Event: EventPoolError,
- Payload: errors.E(op, err),
- })
- }
-
- return
- }
- }
-
- ww.stack.mutex.Unlock()
-
- // worker not in the stack (not returned), forget and allocate new
+ _ = ww.stack.FindAndRemoveByPid(w.Pid())
err = ww.AllocateNew()
if err != nil {
ww.events.Push(PoolEvent{
Event: EventPoolError,
Payload: errors.E(op, err),
})
- return
}
}