diff options
author | Valery Piashchynski <[email protected]> | 2021-02-11 14:06:10 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-02-11 14:06:10 +0300 |
commit | 8f6cafdc0948a5ea13bf9a811b576aa4b3ef7e4a (patch) | |
tree | 92727c3ff8087597bac65eee2c26c9484c98be7f /pkg/worker_watcher/container | |
parent | 7978c59f0ed286912bfcaec81b76e54532b1a9bf (diff) | |
parent | 509abc76a0f7b88678de67843ca79d9052ad8dd6 (diff) |
Merge pull request #530 from spiral/release_stabilizationv2.0.0-RC.1
stabilization(RC): rc stabilization
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 13 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack.go | 143 | ||||
-rw-r--r-- | pkg/worker_watcher/container/stack_test.go | 143 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 45 |
4 files changed, 344 insertions, 0 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go new file mode 100644 index 00000000..532bace9 --- /dev/null +++ b/pkg/worker_watcher/container/interface.go @@ -0,0 +1,13 @@ +package container + +import "github.com/spiral/roadrunner/v2/pkg/worker" + +// Vector interface represents vector container +type Vector interface { + // Enqueue used to put worker to the vector + Enqueue(worker.BaseProcess) + // Dequeue used to get worker from the vector + Dequeue() (worker.BaseProcess, bool) + // Destroy used to stop releasing the workers + Destroy() +} diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go new file mode 100644 index 00000000..fb8ecd3b --- /dev/null +++ b/pkg/worker_watcher/container/stack.go @@ -0,0 +1,143 @@ +package container + +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Stack struct { + sync.RWMutex + workers []worker.BaseProcess + destroy bool + actualNumOfWorkers uint64 + initialNumOfWorkers uint64 +} + +func NewWorkersStack(initialNumOfWorkers uint64) *Stack { + w := runtime.NumCPU() + return &Stack{ + workers: make([]worker.BaseProcess, 0, w), + actualNumOfWorkers: 0, + initialNumOfWorkers: initialNumOfWorkers, + } +} + +func (stack *Stack) Reset() { + stack.Lock() + defer stack.Unlock() + stack.actualNumOfWorkers = 0 + stack.workers = nil +} + +// Push worker back to the vec +// If vec in destroy state, Push will provide 100ms window to unlock the mutex +func (stack *Stack) Push(w worker.BaseProcess) { + stack.Lock() + defer stack.Unlock() + stack.actualNumOfWorkers++ + stack.workers = append(stack.workers, w) +} + +func (stack *Stack) IsEmpty() bool { + stack.Lock() + defer stack.Unlock() + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (worker.BaseProcess, bool) { + stack.Lock() + defer stack.Unlock() + + // do not release new vec + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + // move worker + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + stack.actualNumOfWorkers-- + return w, false +} + +func (stack *Stack) FindAndRemoveByPid(pid int64) bool { + stack.Lock() + defer stack.Unlock() + for i := 0; i < len(stack.workers); i++ { + // worker in the vec, reallocating + if stack.workers[i].Pid() == pid { + stack.workers = append(stack.workers[:i], stack.workers[i+1:]...) + stack.actualNumOfWorkers-- + // worker found and removed + return true + } + } + // no worker with such ID + return false +} + +// Workers return copy of the workers in the vec +func (stack *Stack) Workers() []worker.BaseProcess { + stack.Lock() + defer stack.Unlock() + workersCopy := make([]worker.BaseProcess, 0, 1) + // copy + // TODO pointers, copy have no sense + for _, v := range stack.workers { + if v != nil { + workersCopy = append(workersCopy, v) + } + } + + return workersCopy +} + +func (stack *Stack) isDestroying() bool { + stack.Lock() + defer stack.Unlock() + return stack.destroy +} + +// we also have to give a chance to pool to Push worker (return it) +func (stack *Stack) Destroy(_ context.Context) { + stack.Lock() + stack.destroy = true + stack.Unlock() + + tt := time.NewTicker(time.Millisecond * 500) + defer tt.Stop() + for { + select { + case <-tt.C: + stack.Lock() + // that might be one of the workers is working + if stack.initialNumOfWorkers != stack.actualNumOfWorkers { + stack.Unlock() + continue + } + stack.Unlock() + // unnecessary mutex, but + // just to make sure. All vec at this moment are in the vec + // Pop operation is blocked, push can't be done, since it's not possible to pop + stack.Lock() + for i := 0; i < len(stack.workers); i++ { + // set state for the vec in the vec (unused at the moment) + stack.workers[i].State().Set(worker.StateDestroyed) + // kill the worker + _ = stack.workers[i].Kill() + } + stack.Unlock() + // clear + stack.Reset() + return + } + } +} diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go new file mode 100644 index 00000000..d699664c --- /dev/null +++ b/pkg/worker_watcher/container/stack_test.go @@ -0,0 +1,143 @@ +package container + +import ( + "context" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/stretchr/testify/assert" +) + +func TestNewWorkersStack(t *testing.T) { + stack := NewWorkersStack(0) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) + assert.Equal(t, []worker.BaseProcess{}, stack.workers) +} + +func TestStack_Push(t *testing.T) { + stack := NewWorkersStack(1) + + w, err := worker.InitBaseWorker(&exec.Cmd{}) + assert.NoError(t, err) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) +} + +func TestStack_Pop(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + _, _ = stack.Pop() + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_FindAndRemoveByPid(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + assert.NoError(t, w.Start()) + + sw := worker.From(w) + + stack.Push(sw) + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + stack.FindAndRemoveByPid(w.Pid()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_IsEmpty(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + + sw := worker.From(w) + stack.Push(sw) + + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + + assert.Equal(t, false, stack.IsEmpty()) +} + +func TestStack_Workers(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + wrks := stack.Workers() + assert.Equal(t, 1, len(wrks)) + assert.Equal(t, w.Pid(), wrks[0].Pid()) +} + +func TestStack_Reset(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + assert.Equal(t, uint64(1), stack.actualNumOfWorkers) + stack.Reset() + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_Destroy(t *testing.T) { + stack := NewWorkersStack(1) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + + stack.Destroy(context.Background()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} + +func TestStack_DestroyWithWait(t *testing.T) { + stack := NewWorkersStack(2) + cmd := exec.Command("php", "../tests/client.php", "echo", "pipes") + w, err := worker.InitBaseWorker(cmd) + assert.NoError(t, err) + assert.NoError(t, w.Start()) + + sw := worker.From(w) + stack.Push(sw) + stack.Push(sw) + assert.Equal(t, uint64(2), stack.actualNumOfWorkers) + + go func() { + wrk, _ := stack.Pop() + time.Sleep(time.Second * 3) + stack.Push(wrk) + }() + time.Sleep(time.Second) + stack.Destroy(context.Background()) + assert.Equal(t, uint64(0), stack.actualNumOfWorkers) +} diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go new file mode 100644 index 00000000..239b01c7 --- /dev/null +++ b/pkg/worker_watcher/container/vec.go @@ -0,0 +1,45 @@ +package container + +import ( + "sync/atomic" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) + +type Vec struct { + destroy uint64 + workers chan worker.BaseProcess +} + +func NewVector(initialNumOfWorkers uint64) Vector { + vec := &Vec{ + destroy: 0, + workers: make(chan worker.BaseProcess, initialNumOfWorkers), + } + + return vec +} + +func (v *Vec) Enqueue(w worker.BaseProcess) { + v.workers <- w +} + +func (v *Vec) Dequeue() (worker.BaseProcess, bool) { + /* + if *addr == old { + *addr = new + return true + } + */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { + return nil, true + } + + w := <-v.workers + + return w, false +} + +func (v *Vec) Destroy() { + atomic.StoreUint64(&v.destroy, 1) +} |