diff options
author | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
commit | da64d9fbab7d73e203e7dbbb9503f4d422feaab0 (patch) | |
tree | 3dc3d5dd4a8c4de7d4b57baf2eeb1089f831bc1c /pkg/worker_watcher/stack.go | |
parent | 3e92e3df723ca1c4f152d8526eebfd7184e6fcec (diff) |
BaseProcess interface as a return type in the worker_watcher,pool and
worker interface
Diffstat (limited to 'pkg/worker_watcher/stack.go')
-rw-r--r-- | pkg/worker_watcher/stack.go | 58 |
1 files changed, 29 insertions, 29 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 9a0bc6a4..69e2024b 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -9,8 +9,8 @@ import ( ) type Stack struct { - workers []*worker.SyncWorkerImpl - mutex sync.RWMutex + sync.RWMutex + workers []worker.BaseProcess destroy bool actualNumOfWorkers uint64 initialNumOfWorkers uint64 @@ -19,15 +19,15 @@ type Stack struct { func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]*worker.SyncWorkerImpl, 0, w), + workers: make([]worker.BaseProcess, 0, w), actualNumOfWorkers: 0, initialNumOfWorkers: initialNumOfWorkers, } } func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers = 0 stack.workers = nil } @@ -35,21 +35,21 @@ func (stack *Stack) Reset() { // Push worker back to the stack // If stack in destroy state, Push will provide 100ms window to unlock the mutex func (stack *Stack) Push(w worker.BaseProcess) { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl)) + stack.workers = append(stack.workers, w) } func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return len(stack.workers) == 0 } -func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.Lock() + defer stack.Unlock() // do not release new stack if stack.destroy { @@ -68,8 +68,8 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) { } func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() for i := 0; i < len(stack.workers); i++ { // worker in the stack, reallocating if stack.workers[i].Pid() == pid { @@ -84,10 +84,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool { } // Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.SyncWorker { - stack.mutex.Lock() - defer stack.mutex.Unlock() - workersCopy := make([]worker.SyncWorker, 0, 1) +func (stack *Stack) Workers() []worker.BaseProcess { + stack.Lock() + defer stack.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) // copy // TODO pointers, copy have no sense for _, v := range stack.workers { @@ -100,40 +100,40 @@ func (stack *Stack) Workers() []worker.SyncWorker { } func (stack *Stack) isDestroying() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() + stack.Lock() + defer stack.Unlock() return stack.destroy } // we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(ctx context.Context) { - stack.mutex.Lock() +func (stack *Stack) Destroy(_ context.Context) { + stack.Lock() stack.destroy = true - stack.mutex.Unlock() + stack.Unlock() tt := time.NewTicker(time.Millisecond * 500) defer tt.Stop() for { select { case <-tt.C: - stack.mutex.Lock() + stack.Lock() // that might be one of the workers is working if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.mutex.Unlock() + stack.Unlock() continue } - stack.mutex.Unlock() + stack.Unlock() // unnecessary mutex, but // just to make sure. All stack at this moment are in the stack // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.mutex.Lock() + stack.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } - stack.mutex.Unlock() + stack.Unlock() // clear stack.Reset() return |