diff options
author | Valery Piashchynski <[email protected]> | 2021-01-15 09:41:27 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-15 09:41:27 +0300 |
commit | f648d96c8516652f3231eb87be1f0a000e12793f (patch) | |
tree | 018c67e10f569812271270c2d1e488b5f13983dd /pkg | |
parent | 83e510d734815968a998fa8e4c14ac0900b9c485 (diff) |
Move stack data structure to the separate file
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/worker_watcher/stack.go | 138 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 130 |
2 files changed, 138 insertions, 130 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go new file mode 100644 index 00000000..55f1f52a --- /dev/null +++ b/pkg/worker_watcher/stack.go @@ -0,0 +1,138 @@ +package worker_watcher //nolint:golint,stylecheck +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/internal" +) + +type Stack struct { + workers []worker.BaseProcess + mutex sync.RWMutex + destroy bool + actualNumOfWorkers int64 +} + +func NewWorkersStack() *Stack { + w := runtime.NumCPU() + return &Stack{ + workers: make([]worker.BaseProcess, 0, w), + actualNumOfWorkers: 0, + } +} + +func (stack *Stack) Reset() { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers = 0 + stack.workers = nil +} + +// Push worker back to the stack +// If stack in destroy state, Push will provide 100ms window to unlock the mutex +func (stack *Stack) Push(w worker.BaseProcess) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.actualNumOfWorkers++ + stack.workers = append(stack.workers, w) +} + +func (stack *Stack) IsEmpty() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + // do not release new stack + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + // move worker + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + stack.actualNumOfWorkers-- + return w, false +} + +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the stack, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the stack +func (stack *Stack) Workers() []worker.BaseProcess { + stack.mutex.Lock() + defer stack.mutex.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) + // copy + for _, v := range stack.workers { + workersCopy = append(workersCopy, v) + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(ctx context.Context) { + stack.mutex.Lock() + stack.destroy = true + stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + stack.mutex.Lock() + // that might be one of the workers is working + if len(stack.workers) != int(stack.actualNumOfWorkers) { + stack.mutex.Unlock() + continue + } + stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.mutex.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + stack.workers[i].State().Set(internal.StateDestroyed) + // kill the worker + _ = stack.workers[i].Kill() + } + stack.mutex.Unlock() + tt.Stop() + // clear + stack.Reset() + return + } + } +} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 127dc801..0c086d5f 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -2,9 +2,7 @@ package worker_watcher //nolint:golint,stylecheck import ( "context" - "runtime" "sync" - "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" @@ -12,134 +10,6 @@ import ( "github.com/spiral/roadrunner/v2/internal" ) -type Stack struct { - workers []worker.BaseProcess - mutex sync.RWMutex - destroy bool - actualNumOfWorkers int64 -} - -func NewWorkersStack() *Stack { - w := runtime.NumCPU() - return &Stack{ - workers: make([]worker.BaseProcess, 0, w), - actualNumOfWorkers: 0, - } -} - -func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() - stack.actualNumOfWorkers = 0 - stack.workers = nil -} - -// Push worker back to the stack -// If stack in destroy state, Push will provide 100ms window to unlock the mutex -func (stack *Stack) Push(w worker.BaseProcess) { - stack.mutex.Lock() - defer stack.mutex.Unlock() - stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) -} - -func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() - return len(stack.workers) == 0 -} - -func (stack *Stack) Pop() (worker.BaseProcess, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() - - // do not release new stack - if stack.destroy { - return nil, true - } - - if len(stack.workers) == 0 { - return nil, false - } - - // move worker - w := stack.workers[len(stack.workers)-1] - stack.workers = stack.workers[:len(stack.workers)-1] - stack.actualNumOfWorkers-- - return w, false -} - -func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() - for i := 0; i < len(stack.workers); i++ { - // worker in the stack, reallocating - if stack.workers[i].Pid() == pid { - stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) - stack.actualNumOfWorkers-- - // worker found and removed - return true - } - } - // no worker with such ID - return false -} - -// Workers return copy of the workers in the stack -func (stack *Stack) Workers() []worker.BaseProcess { - stack.mutex.Lock() - defer stack.mutex.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) - // copy - for _, v := range stack.workers { - workersCopy = append(workersCopy, v) - } - - return workersCopy -} - -func (stack *Stack) isDestroying() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() - return stack.destroy -} - -// we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(ctx context.Context) { - stack.mutex.Lock() - stack.destroy = true - stack.mutex.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-tt.C: - stack.mutex.Lock() - // that might be one of the workers is working - if len(stack.workers) != int(stack.actualNumOfWorkers) { - stack.mutex.Unlock() - continue - } - stack.mutex.Unlock() - // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack - // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.mutex.Lock() - for i := 0; i < len(stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(internal.StateDestroyed) - // kill the worker - _ = stack.workers[i].Kill() - } - stack.mutex.Unlock() - tt.Stop() - // clear - stack.Reset() - return - } - } -} - // workerCreateFunc can be nil, but in that case, dead stack will not be replaced func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { ww := &workerWatcher{ |