summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/stack.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-08 23:21:54 +0300
committerValery Piashchynski <[email protected]>2021-02-08 23:21:54 +0300
commitda64d9fbab7d73e203e7dbbb9503f4d422feaab0 (patch)
tree3dc3d5dd4a8c4de7d4b57baf2eeb1089f831bc1c /pkg/worker_watcher/stack.go
parent3e92e3df723ca1c4f152d8526eebfd7184e6fcec (diff)
BaseProcess interface as a return type in the worker_watcher,pool and
worker interface
Diffstat (limited to 'pkg/worker_watcher/stack.go')
-rw-r--r--pkg/worker_watcher/stack.go58
1 files changed, 29 insertions, 29 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 9a0bc6a4..69e2024b 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -9,8 +9,8 @@ import (
)
type Stack struct {
- workers []*worker.SyncWorkerImpl
- mutex sync.RWMutex
+ sync.RWMutex
+ workers []worker.BaseProcess
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
@@ -19,15 +19,15 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]*worker.SyncWorkerImpl, 0, w),
+ workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
@@ -35,21 +35,21 @@ func (stack *Stack) Reset() {
// Push worker back to the stack
// If stack in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
+ stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+ stack.Lock()
+ defer stack.Unlock()
// do not release new stack
if stack.destroy {
@@ -68,8 +68,8 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
// worker in the stack, reallocating
if stack.workers[i].Pid() == pid {
@@ -84,10 +84,10 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
}
// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.SyncWorker {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
- workersCopy := make([]worker.SyncWorker, 0, 1)
+func (stack *Stack) Workers() []worker.BaseProcess {
+ stack.Lock()
+ defer stack.Unlock()
+ workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
// TODO pointers, copy have no sense
for _, v := range stack.workers {
@@ -100,40 +100,40 @@ func (stack *Stack) Workers() []worker.SyncWorker {
}
func (stack *Stack) isDestroying() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(ctx context.Context) {
- stack.mutex.Lock()
+func (stack *Stack) Destroy(_ context.Context) {
+ stack.Lock()
stack.destroy = true
- stack.mutex.Unlock()
+ stack.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
- stack.mutex.Lock()
+ stack.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.mutex.Unlock()
+ stack.Unlock()
continue
}
- stack.mutex.Unlock()
+ stack.Unlock()
// unnecessary mutex, but
// just to make sure. All stack at this moment are in the stack
// Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.mutex.Lock()
+ stack.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
- stack.mutex.Unlock()
+ stack.Unlock()
// clear
stack.Reset()
return