diff options
author | Valery Piashchynski <[email protected]> | 2021-01-15 14:22:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-15 14:22:02 +0300 |
commit | e68c8e2eb9ea705e9d846023d545410c7613de64 (patch) | |
tree | 4fe8eadc1e7af49f1f282782ac1b7f2283fb63ea /pkg | |
parent | f7d5f8fb3d14519dc89e346d6b2fc67c1837da5f (diff) |
Use uniform snake case in the configs, fix critical issue with wrong
calculation of workers in stack at the Destroy stage
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/worker_watcher/stack.go | 18 | ||||
-rw-r--r-- | pkg/worker_watcher/stack_test.go | 58 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 18 |
3 files changed, 57 insertions, 37 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 55f1f52a..788750dc 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -10,17 +10,19 @@ import ( ) type Stack struct { - workers []worker.BaseProcess - mutex sync.RWMutex - destroy bool - actualNumOfWorkers int64 + workers []worker.BaseProcess + mutex sync.RWMutex + destroy bool + actualNumOfWorkers uint64 + initialNumOfWorkers uint64 } -func NewWorkersStack() *Stack { +func NewWorkersStack(initialNumOfWorkers uint64) *Stack { w := runtime.NumCPU() return &Stack{ - workers: make([]worker.BaseProcess, 0, w), - actualNumOfWorkers: 0, + workers: make([]worker.BaseProcess, 0, w), + actualNumOfWorkers: 0, + initialNumOfWorkers: initialNumOfWorkers, } } @@ -113,7 +115,7 @@ func (stack *Stack) Destroy(ctx context.Context) { case <-tt.C: stack.mutex.Lock() // that might be one of the workers is working - if len(stack.workers) != int(stack.actualNumOfWorkers) { + if stack.initialNumOfWorkers != stack.actualNumOfWorkers { stack.mutex.Unlock() continue } diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 7cdb88f1..86af2043 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -3,6 +3,7 @@ import ( "context" "os/exec" "testing" + "time" "github.com/spiral/roadrunner/v2/interfaces/worker" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -10,37 +11,37 @@ import ( ) func TestNewWorkersStack(t *testing.T) { - stack := NewWorkersStack() - assert.Equal(t, int64(0), stack.actualNumOfWorkers) + stack := NewWorkersStack(0) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) assert.Equal(t, []worker.BaseProcess{}, stack.workers) } func TestStack_Push(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) w, err := workerImpl.InitBaseWorker(&exec.Cmd{}) assert.NoError(t, err) stack.Push(w) - assert.Equal(t, int64(1), stack.actualNumOfWorkers) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) } func TestStack_Pop(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) stack.Push(w) - assert.Equal(t, int64(1), stack.actualNumOfWorkers) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) _, _ = stack.Pop() - assert.Equal(t, int64(0), stack.actualNumOfWorkers) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } func TestStack_FindAndRemoveByPid(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) @@ -48,27 +49,27 @@ func TestStack_FindAndRemoveByPid(t *testing.T) { assert.NoError(t, w.Start()) stack.Push(w) - assert.Equal(t, int64(1), stack.actualNumOfWorkers) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.FindAndRemoveByPid(w.Pid()) - assert.Equal(t, int64(0), stack.actualNumOfWorkers) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } func TestStack_IsEmpty(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) stack.Push(w) - assert.Equal(t, int64(1), stack.actualNumOfWorkers) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) assert.Equal(t, false, stack.IsEmpty()) } func TestStack_Workers(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) @@ -82,20 +83,20 @@ func TestStack_Workers(t *testing.T) { } func TestStack_Reset(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) assert.NoError(t, w.Start()) stack.Push(w) - assert.Equal(t, int64(1), stack.actualNumOfWorkers) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) stack.Reset() - assert.Equal(t, int64(0), stack.actualNumOfWorkers) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } func TestStack_Destroy(t *testing.T) { - stack := NewWorkersStack() + stack := NewWorkersStack(1) cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") w, err := workerImpl.InitBaseWorker(cmd) assert.NoError(t, err) @@ -103,5 +104,26 @@ func TestStack_Destroy(t *testing.T) { stack.Push(w) stack.Destroy(context.Background()) - assert.Equal(t, int64(0), stack.actualNumOfWorkers) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_DestroyWithWait(t *testing.T) { + stack := NewWorkersStack(2) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := workerImpl.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + stack.Push(w) + stack.Push(w) + assert.Equal(t, uint64(2), stack.actualNumOfWorkers) + + go func() { + wrk, _ := stack.Pop() + time.Sleep(time.Second * 3) + stack.Push(wrk) + }() + time.Sleep(time.Second) + stack.Destroy(context.Background()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 0c086d5f..bf1f2435 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -13,23 +13,19 @@ import ( // workerCreateFunc can be nil, but in that case, dead stack will not be replaced func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(), - allocator: allocator, - initialNumWorkers: numWorkers, - actualNumWorkers: numWorkers, - events: events, + stack: NewWorkersStack(uint64(numWorkers)), + allocator: allocator, + events: events, } return ww } type workerWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator worker.Allocator - initialNumWorkers int64 - actualNumWorkers int64 - events events.Handler + mutex sync.RWMutex + stack *Stack + allocator worker.Allocator + events events.Handler } func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { |