diff options
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/container/stack.go | 143 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go | 143 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 73 |
3 files changed, 37 insertions, 322 deletions
diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go deleted file mode 100644 index fb8ecd3b..00000000 --- a/pkg/worker_watcher/container/stack.go +++ /dev/null @@ -1,143 +0,0 @@ -package container - -import ( - "context" - "runtime" - "sync" - "time" - - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -type Stack struct { - sync.RWMutex - workers []worker.BaseProcess - destroy bool - actualNumOfWorkers uint64 - initialNumOfWorkers uint64 -} - -func NewWorkersStack(initialNumOfWorkers uint64) *Stack { - w := runtime.NumCPU() - return &Stack{ - workers: make([]worker.BaseProcess, 0, w), - actualNumOfWorkers: 0, - initialNumOfWorkers: initialNumOfWorkers, - } -} - -func (stack *Stack) Reset() { - stack.Lock() - defer stack.Unlock() - stack.actualNumOfWorkers = 0 - stack.workers = nil -} - -// Push worker back to the vec -// If vec in destroy state, Push will provide 100ms window to unlock the mutex -func (stack *Stack) Push(w worker.BaseProcess) { - stack.Lock() - defer stack.Unlock() - stack.actualNumOfWorkers++ - stack.workers = append(stack.workers, w) -} - -func (stack *Stack) IsEmpty() bool { - stack.Lock() - defer stack.Unlock() - return len(stack.workers) == 0 -} - -func (stack *Stack) Pop() (worker.BaseProcess, bool) { - stack.Lock() - defer stack.Unlock() - - // do not release new vec - if stack.destroy { - return nil, true - } - - if len(stack.workers) == 0 { - return nil, false - } - - // move worker - w := stack.workers[len(stack.workers)-1] - stack.workers = stack.workers[:len(stack.workers)-1] - stack.actualNumOfWorkers-- - return w, false -} - -func (stack *Stack) FindAndRemoveByPid(pid int64) bool { - stack.Lock() - defer stack.Unlock() - for i := 0; i < len(stack.workers); i++ { - // worker in the vec, reallocating - if stack.workers[i].Pid() == pid { - stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) - stack.actualNumOfWorkers-- - // worker found and removed - return true - } - } - // no worker with such ID - return false -} - -// Workers return copy of the workers in the vec -func (stack *Stack) Workers() []worker.BaseProcess { - stack.Lock() - defer stack.Unlock() - workersCopy := make([]worker.BaseProcess, 0, 1) - // copy - // TODO pointers, copy have no sense - for _, v := range stack.workers { - if v != nil { - workersCopy = append(workersCopy, v) - } - } - - return workersCopy -} - -func (stack *Stack) isDestroying() bool { - stack.Lock() - defer stack.Unlock() - return stack.destroy -} - -// we also have to give a chance to pool to Push worker (return it) -func (stack *Stack) Destroy(_ context.Context) { - stack.Lock() - stack.destroy = true - stack.Unlock() - - tt := time.NewTicker(time.Millisecond * 500) - defer tt.Stop() - for { - select { - case <-tt.C: - stack.Lock() - // that might be one of the workers is working - if stack.initialNumOfWorkers != stack.actualNumOfWorkers { - stack.Unlock() - continue - } - stack.Unlock() - // unnecessary mutex, but - // just to make sure. All vec at this moment are in the vec - // Pop operation is blocked, push can't be done, since it's not possible to pop - stack.Lock() - for i := 0; i < len(stack.workers); i++ { - // set state for the vec in the vec (unused at the moment) - stack.workers[i].State().Set(worker.StateDestroyed) - // kill the worker - _ = stack.workers[i].Kill() - } - stack.Unlock() - // clear - stack.Reset() - return - } - } -} diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go deleted file mode 100644 index d699664c..00000000 --- a/pkg/worker_watcher/container/stack_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package container - -import ( - "context" - "os/exec" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/stretchr/testify/assert" -) - -func TestNewWorkersStack(t *testing.T) { - stack := NewWorkersStack(0) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) - assert.Equal(t, []worker.BaseProcess{}, stack.workers) -} - -func TestStack_Push(t *testing.T) { - stack := NewWorkersStack(1) - - w, err := worker.InitBaseWorker(&exec.Cmd{}) - assert.NoError(t, err) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) -} - -func TestStack_Pop(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - _, _ = stack.Pop() - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_FindAndRemoveByPid(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - assert.NoError(t, w.Start()) - - sw := worker.From(w) - - stack.Push(sw) - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - stack.FindAndRemoveByPid(w.Pid()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_IsEmpty(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - - sw := worker.From(w) - stack.Push(sw) - - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - - assert.Equal(t, false, stack.IsEmpty()) -} - -func TestStack_Workers(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - wrks := stack.Workers() - assert.Equal(t, 1, len(wrks)) - assert.Equal(t, w.Pid(), wrks[0].Pid()) -} - -func TestStack_Reset(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - assert.Equal(t, uint64(1), stack.actualNumOfWorkers) - stack.Reset() - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_Destroy(t *testing.T) { - stack := NewWorkersStack(1) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - - stack.Destroy(context.Background()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} - -func TestStack_DestroyWithWait(t *testing.T) { - stack := NewWorkersStack(2) - cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") - w, err := worker.InitBaseWorker(cmd) - assert.NoError(t, err) - assert.NoError(t, w.Start()) - - sw := worker.From(w) - stack.Push(sw) - stack.Push(sw) - assert.Equal(t, uint64(2), stack.actualNumOfWorkers) - - go func() { - wrk, _ := stack.Pop() - time.Sleep(time.Second * 3) - stack.Push(wrk) - }() - time.Sleep(time.Second) - stack.Destroy(context.Background()) - assert.Equal(t, uint64(0), stack.actualNumOfWorkers) -} diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cc8cc2b6..a6dfe43e 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { } // ========================================================= // SLOW PATH - _ = w.Kill() + _ = w.Kill() // how the worker get here??????? // no free workers in the container // try to continuously get free one for { - select { - default: - w, stop = ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } } }() @@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { + if w.State().Value() != worker.StateReady { + _ = w.Kill() + return + } ww.container.Enqueue(w) } @@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() - for { + for { //nolint:gosimple select { case <-tt.C: ww.Lock() |