summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/stack.go143
-rw-r--r--pkg/worker_watcher/container/stack_test.go143
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go73
3 files changed, 37 insertions, 322 deletions
diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go
deleted file mode 100644
index fb8ecd3b..00000000
--- a/pkg/worker_watcher/container/stack.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package container
-
-import (
- "context"
- "runtime"
- "sync"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-type Stack struct {
- sync.RWMutex
- workers []worker.BaseProcess
- destroy bool
- actualNumOfWorkers uint64
- initialNumOfWorkers uint64
-}
-
-func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
- w := runtime.NumCPU()
- return &Stack{
- workers: make([]worker.BaseProcess, 0, w),
- actualNumOfWorkers: 0,
- initialNumOfWorkers: initialNumOfWorkers,
- }
-}
-
-func (stack *Stack) Reset() {
- stack.Lock()
- defer stack.Unlock()
- stack.actualNumOfWorkers = 0
- stack.workers = nil
-}
-
-// Push worker back to the vec
-// If vec in destroy state, Push will provide 100ms window to unlock the mutex
-func (stack *Stack) Push(w worker.BaseProcess) {
- stack.Lock()
- defer stack.Unlock()
- stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w)
-}
-
-func (stack *Stack) IsEmpty() bool {
- stack.Lock()
- defer stack.Unlock()
- return len(stack.workers) == 0
-}
-
-func (stack *Stack) Pop() (worker.BaseProcess, bool) {
- stack.Lock()
- defer stack.Unlock()
-
- // do not release new vec
- if stack.destroy {
- return nil, true
- }
-
- if len(stack.workers) == 0 {
- return nil, false
- }
-
- // move worker
- w := stack.workers[len(stack.workers)-1]
- stack.workers = stack.workers[:len(stack.workers)-1]
- stack.actualNumOfWorkers--
- return w, false
-}
-
-func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.Lock()
- defer stack.Unlock()
- for i := 0; i < len(stack.workers); i++ {
- // worker in the vec, reallocating
- if stack.workers[i].Pid() == pid {
- stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
- stack.actualNumOfWorkers--
- // worker found and removed
- return true
- }
- }
- // no worker with such ID
- return false
-}
-
-// Workers return copy of the workers in the vec
-func (stack *Stack) Workers() []worker.BaseProcess {
- stack.Lock()
- defer stack.Unlock()
- workersCopy := make([]worker.BaseProcess, 0, 1)
- // copy
- // TODO pointers, copy have no sense
- for _, v := range stack.workers {
- if v != nil {
- workersCopy = append(workersCopy, v)
- }
- }
-
- return workersCopy
-}
-
-func (stack *Stack) isDestroying() bool {
- stack.Lock()
- defer stack.Unlock()
- return stack.destroy
-}
-
-// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(_ context.Context) {
- stack.Lock()
- stack.destroy = true
- stack.Unlock()
-
- tt := time.NewTicker(time.Millisecond * 500)
- defer tt.Stop()
- for {
- select {
- case <-tt.C:
- stack.Lock()
- // that might be one of the workers is working
- if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.Unlock()
- continue
- }
- stack.Unlock()
- // unnecessary mutex, but
- // just to make sure. All vec at this moment are in the vec
- // Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.Lock()
- for i := 0; i < len(stack.workers); i++ {
- // set state for the vec in the vec (unused at the moment)
- stack.workers[i].State().Set(worker.StateDestroyed)
- // kill the worker
- _ = stack.workers[i].Kill()
- }
- stack.Unlock()
- // clear
- stack.Reset()
- return
- }
- }
-}
diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go
deleted file mode 100644
index d699664c..00000000
--- a/pkg/worker_watcher/container/stack_test.go
+++ /dev/null
@@ -1,143 +0,0 @@
-package container
-
-import (
- "context"
- "os/exec"
- "testing"
- "time"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/stretchr/testify/assert"
-)
-
-func TestNewWorkersStack(t *testing.T) {
- stack := NewWorkersStack(0)
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []worker.BaseProcess{}, stack.workers)
-}
-
-func TestStack_Push(t *testing.T) {
- stack := NewWorkersStack(1)
-
- w, err := worker.InitBaseWorker(&exec.Cmd{})
- assert.NoError(t, err)
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-}
-
-func TestStack_Pop(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
-
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- _, _ = stack.Pop()
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_FindAndRemoveByPid(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
-
- stack.Push(sw)
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- stack.FindAndRemoveByPid(w.Pid())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_IsEmpty(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
-
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
-
- sw := worker.From(w)
- stack.Push(sw)
-
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
-
- assert.Equal(t, false, stack.IsEmpty())
-}
-
-func TestStack_Workers(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- wrks := stack.Workers()
- assert.Equal(t, 1, len(wrks))
- assert.Equal(t, w.Pid(), wrks[0].Pid())
-}
-
-func TestStack_Reset(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
- stack.Reset()
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_Destroy(t *testing.T) {
- stack := NewWorkersStack(1)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
-
- stack.Destroy(context.Background())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
-
-func TestStack_DestroyWithWait(t *testing.T) {
- stack := NewWorkersStack(2)
- cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
- w, err := worker.InitBaseWorker(cmd)
- assert.NoError(t, err)
- assert.NoError(t, w.Start())
-
- sw := worker.From(w)
- stack.Push(sw)
- stack.Push(sw)
- assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
-
- go func() {
- wrk, _ := stack.Pop()
- time.Sleep(time.Second * 3)
- stack.Push(wrk)
- }()
- time.Sleep(time.Second)
- stack.Destroy(context.Background())
- assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cc8cc2b6..a6dfe43e 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
}
// =========================================================
// SLOW PATH
- _ = w.Kill()
+ _ = w.Kill() // how the worker get here???????
// no free workers in the container
// try to continuously get free one
for {
- select {
- default:
- w, stop = ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
+ w, stop = ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
}
+ }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- c <- get{
- w,
- nil,
- }
- return
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
}
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
}
}()
@@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
+ if w.State().Value() != worker.StateReady {
+ _ = w.Kill()
+ return
+ }
ww.container.Enqueue(w)
}
@@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
- for {
+ for { //nolint:gosimple
select {
case <-tt.C:
ww.Lock()