summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/container
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-11 14:06:10 +0300
committerGitHub <[email protected]>2021-02-11 14:06:10 +0300
commit8f6cafdc0948a5ea13bf9a811b576aa4b3ef7e4a (patch)
tree92727c3ff8087597bac65eee2c26c9484c98be7f /pkg/worker_watcher/container
parent7978c59f0ed286912bfcaec81b76e54532b1a9bf (diff)
parent509abc76a0f7b88678de67843ca79d9052ad8dd6 (diff)
Merge pull request #530 from spiral/release_stabilizationv2.0.0-RC.1
stabilization(RC): rc stabilization
Diffstat (limited to 'pkg/worker_watcher/container')
-rw-r--r--pkg/worker_watcher/container/interface.go13
-rw-r--r--pkg/worker_watcher/container/stack.go143
-rw-r--r--pkg/worker_watcher/container/stack_test.go143
-rw-r--r--pkg/worker_watcher/container/vec.go45
4 files changed, 344 insertions, 0 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
new file mode 100644
index 00000000..532bace9
--- /dev/null
+++ b/pkg/worker_watcher/container/interface.go
@@ -0,0 +1,13 @@
+package container
+
+import "github.com/spiral/roadrunner/v2/pkg/worker"
+
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue() (worker.BaseProcess, bool)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
diff --git a/pkg/worker_watcher/container/stack.go b/pkg/worker_watcher/container/stack.go
new file mode 100644
index 00000000..fb8ecd3b
--- /dev/null
+++ b/pkg/worker_watcher/container/stack.go
@@ -0,0 +1,143 @@
+package container
+
+import (
+ "context"
+ "runtime"
+ "sync"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Stack struct {
+ sync.RWMutex
+ workers []worker.BaseProcess
+ destroy bool
+ actualNumOfWorkers uint64
+ initialNumOfWorkers uint64
+}
+
+func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
+ w := runtime.NumCPU()
+ return &Stack{
+ workers: make([]worker.BaseProcess, 0, w),
+ actualNumOfWorkers: 0,
+ initialNumOfWorkers: initialNumOfWorkers,
+ }
+}
+
+func (stack *Stack) Reset() {
+ stack.Lock()
+ defer stack.Unlock()
+ stack.actualNumOfWorkers = 0
+ stack.workers = nil
+}
+
+// Push worker back to the vec
+// If vec in destroy state, Push will provide 100ms window to unlock the mutex
+func (stack *Stack) Push(w worker.BaseProcess) {
+ stack.Lock()
+ defer stack.Unlock()
+ stack.actualNumOfWorkers++
+ stack.workers = append(stack.workers, w)
+}
+
+func (stack *Stack) IsEmpty() bool {
+ stack.Lock()
+ defer stack.Unlock()
+ return len(stack.workers) == 0
+}
+
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+ stack.Lock()
+ defer stack.Unlock()
+
+ // do not release new vec
+ if stack.destroy {
+ return nil, true
+ }
+
+ if len(stack.workers) == 0 {
+ return nil, false
+ }
+
+ // move worker
+ w := stack.workers[len(stack.workers)-1]
+ stack.workers = stack.workers[:len(stack.workers)-1]
+ stack.actualNumOfWorkers--
+ return w, false
+}
+
+func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
+ stack.Lock()
+ defer stack.Unlock()
+ for i := 0; i < len(stack.workers); i++ {
+ // worker in the vec, reallocating
+ if stack.workers[i].Pid() == pid {
+ stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
+ stack.actualNumOfWorkers--
+ // worker found and removed
+ return true
+ }
+ }
+ // no worker with such ID
+ return false
+}
+
+// Workers return copy of the workers in the vec
+func (stack *Stack) Workers() []worker.BaseProcess {
+ stack.Lock()
+ defer stack.Unlock()
+ workersCopy := make([]worker.BaseProcess, 0, 1)
+ // copy
+ // TODO pointers, copy have no sense
+ for _, v := range stack.workers {
+ if v != nil {
+ workersCopy = append(workersCopy, v)
+ }
+ }
+
+ return workersCopy
+}
+
+func (stack *Stack) isDestroying() bool {
+ stack.Lock()
+ defer stack.Unlock()
+ return stack.destroy
+}
+
+// we also have to give a chance to pool to Push worker (return it)
+func (stack *Stack) Destroy(_ context.Context) {
+ stack.Lock()
+ stack.destroy = true
+ stack.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 500)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ stack.Lock()
+ // that might be one of the workers is working
+ if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
+ stack.Unlock()
+ continue
+ }
+ stack.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All vec at this moment are in the vec
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ stack.Lock()
+ for i := 0; i < len(stack.workers); i++ {
+ // set state for the vec in the vec (unused at the moment)
+ stack.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = stack.workers[i].Kill()
+ }
+ stack.Unlock()
+ // clear
+ stack.Reset()
+ return
+ }
+ }
+}
diff --git a/pkg/worker_watcher/container/stack_test.go b/pkg/worker_watcher/container/stack_test.go
new file mode 100644
index 00000000..d699664c
--- /dev/null
+++ b/pkg/worker_watcher/container/stack_test.go
@@ -0,0 +1,143 @@
+package container
+
+import (
+ "context"
+ "os/exec"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewWorkersStack(t *testing.T) {
+ stack := NewWorkersStack(0)
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+ assert.Equal(t, []worker.BaseProcess{}, stack.workers)
+}
+
+func TestStack_Push(t *testing.T) {
+ stack := NewWorkersStack(1)
+
+ w, err := worker.InitBaseWorker(&exec.Cmd{})
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+}
+
+func TestStack_Pop(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ _, _ = stack.Pop()
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_FindAndRemoveByPid(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+
+ stack.Push(sw)
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ stack.FindAndRemoveByPid(w.Pid())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_IsEmpty(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+
+ assert.Equal(t, false, stack.IsEmpty())
+}
+
+func TestStack_Workers(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ wrks := stack.Workers()
+ assert.Equal(t, 1, len(wrks))
+ assert.Equal(t, w.Pid(), wrks[0].Pid())
+}
+
+func TestStack_Reset(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ assert.Equal(t, uint64(1), stack.actualNumOfWorkers)
+ stack.Reset()
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_Destroy(t *testing.T) {
+ stack := NewWorkersStack(1)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+
+ stack.Destroy(context.Background())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
+
+func TestStack_DestroyWithWait(t *testing.T) {
+ stack := NewWorkersStack(2)
+ cmd := exec.Command("php", "../tests/client.php", "echo", "pipes")
+ w, err := worker.InitBaseWorker(cmd)
+ assert.NoError(t, err)
+ assert.NoError(t, w.Start())
+
+ sw := worker.From(w)
+ stack.Push(sw)
+ stack.Push(sw)
+ assert.Equal(t, uint64(2), stack.actualNumOfWorkers)
+
+ go func() {
+ wrk, _ := stack.Pop()
+ time.Sleep(time.Second * 3)
+ stack.Push(wrk)
+ }()
+ time.Sleep(time.Second)
+ stack.Destroy(context.Background())
+ assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
+}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
new file mode 100644
index 00000000..239b01c7
--- /dev/null
+++ b/pkg/worker_watcher/container/vec.go
@@ -0,0 +1,45 @@
+package container
+
+import (
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) Vector {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Enqueue(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, true
+ }
+
+ w := <-v.workers
+
+ return w, false
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}