summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/interface.go13
-rw-r--r--pkg/worker_watcher/container/stack.go (renamed from pkg/worker_watcher/stack.go)79
-rw-r--r--pkg/worker_watcher/container/stack_test.go (renamed from pkg/worker_watcher/stack_test.go)5
-rw-r--r--pkg/worker_watcher/container/vec.go45
-rw-r--r--pkg/worker_watcher/interface.go27
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go228
6 files changed, 277 insertions, 120 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
new file mode 100644
index 00000000..532bace9
--- /dev/null
+++ b/pkg/worker_watcher/container/interface.go
@@ -0,0 +1,13 @@
+package container
+
+import "github.com/spiral/roadrunner/v2/pkg/worker"
+
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue() (worker.BaseProcess, bool)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/container/stack.go
index 55034e41..fb8ecd3b 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/container/stack.go
@@ -1,17 +1,17 @@
-package worker_watcher //nolint:golint,stylecheck
+package container
+
import (
"context"
"runtime"
"sync"
"time"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
- workers []*worker.SyncWorkerImpl
- mutex sync.RWMutex
+ sync.RWMutex
+ workers []worker.BaseProcess
destroy bool
actualNumOfWorkers uint64
initialNumOfWorkers uint64
@@ -20,39 +20,39 @@ type Stack struct {
func NewWorkersStack(initialNumOfWorkers uint64) *Stack {
w := runtime.NumCPU()
return &Stack{
- workers: make([]*worker.SyncWorkerImpl, 0, w),
+ workers: make([]worker.BaseProcess, 0, w),
actualNumOfWorkers: 0,
initialNumOfWorkers: initialNumOfWorkers,
}
}
func (stack *Stack) Reset() {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers = 0
stack.workers = nil
}
-// Push worker back to the stack
-// If stack in destroy state, Push will provide 100ms window to unlock the mutex
+// Push worker back to the vec
+// If vec in destroy state, Push will provide 100ms window to unlock the mutex
func (stack *Stack) Push(w worker.BaseProcess) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
stack.actualNumOfWorkers++
- stack.workers = append(stack.workers, w.(*worker.SyncWorkerImpl))
+ stack.workers = append(stack.workers, w)
}
func (stack *Stack) IsEmpty() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return len(stack.workers) == 0
}
-func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+func (stack *Stack) Pop() (worker.BaseProcess, bool) {
+ stack.Lock()
+ defer stack.Unlock()
- // do not release new stack
+ // do not release new vec
if stack.destroy {
return nil, true
}
@@ -69,10 +69,10 @@ func (stack *Stack) Pop() (*worker.SyncWorkerImpl, bool) {
}
func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
for i := 0; i < len(stack.workers); i++ {
- // worker in the stack, reallocating
+ // worker in the vec, reallocating
if stack.workers[i].Pid() == pid {
stack.workers = append(stack.workers[:i], stack.workers[i+1:]...)
stack.actualNumOfWorkers--
@@ -84,12 +84,13 @@ func (stack *Stack) FindAndRemoveByPid(pid int64) bool {
return false
}
-// Workers return copy of the workers in the stack
-func (stack *Stack) Workers() []worker.SyncWorker {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
- workersCopy := make([]worker.SyncWorker, 0, 1)
+// Workers return copy of the workers in the vec
+func (stack *Stack) Workers() []worker.BaseProcess {
+ stack.Lock()
+ defer stack.Unlock()
+ workersCopy := make([]worker.BaseProcess, 0, 1)
// copy
+ // TODO pointers, copy have no sense
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
@@ -100,40 +101,40 @@ func (stack *Stack) Workers() []worker.SyncWorker {
}
func (stack *Stack) isDestroying() bool {
- stack.mutex.Lock()
- defer stack.mutex.Unlock()
+ stack.Lock()
+ defer stack.Unlock()
return stack.destroy
}
// we also have to give a chance to pool to Push worker (return it)
-func (stack *Stack) Destroy(ctx context.Context) {
- stack.mutex.Lock()
+func (stack *Stack) Destroy(_ context.Context) {
+ stack.Lock()
stack.destroy = true
- stack.mutex.Unlock()
+ stack.Unlock()
tt := time.NewTicker(time.Millisecond * 500)
defer tt.Stop()
for {
select {
case <-tt.C:
- stack.mutex.Lock()
+ stack.Lock()
// that might be one of the workers is working
if stack.initialNumOfWorkers != stack.actualNumOfWorkers {
- stack.mutex.Unlock()
+ stack.Unlock()
continue
}
- stack.mutex.Unlock()
+ stack.Unlock()
// unnecessary mutex, but
- // just to make sure. All stack at this moment are in the stack
+ // just to make sure. All vec at this moment are in the vec
// Pop operation is blocked, push can't be done, since it's not possible to pop
- stack.mutex.Lock()
+ stack.Lock()
for i := 0; i < len(stack.workers); i++ {
- // set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(states.StateDestroyed)
+ // set state for the vec in the vec (unused at the moment)
+ stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
- stack.mutex.Unlock()
+ stack.Unlock()
// clear
stack.Reset()
return
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/container/stack_test.go
index 5287a6dc..d699664c 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/container/stack_test.go
@@ -1,4 +1,5 @@
-package worker_watcher //nolint:golint,stylecheck
+package container
+
import (
"context"
"os/exec"
@@ -12,7 +13,7 @@ import (
func TestNewWorkersStack(t *testing.T) {
stack := NewWorkersStack(0)
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
- assert.Equal(t, []*worker.SyncWorkerImpl{}, stack.workers)
+ assert.Equal(t, []worker.BaseProcess{}, stack.workers)
}
func TestStack_Push(t *testing.T) {
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
new file mode 100644
index 00000000..239b01c7
--- /dev/null
+++ b/pkg/worker_watcher/container/vec.go
@@ -0,0 +1,45 @@
+package container
+
+import (
+ "sync/atomic"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
+
+type Vec struct {
+ destroy uint64
+ workers chan worker.BaseProcess
+}
+
+func NewVector(initialNumOfWorkers uint64) Vector {
+ vec := &Vec{
+ destroy: 0,
+ workers: make(chan worker.BaseProcess, initialNumOfWorkers),
+ }
+
+ return vec
+}
+
+func (v *Vec) Enqueue(w worker.BaseProcess) {
+ v.workers <- w
+}
+
+func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+ /*
+ if *addr == old {
+ *addr = new
+ return true
+ }
+ */
+ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
+ return nil, true
+ }
+
+ w := <-v.workers
+
+ return w, false
+}
+
+func (v *Vec) Destroy() {
+ atomic.StoreUint64(&v.destroy, 1)
+}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 927aa270..4625b7a7 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -6,25 +6,26 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
+// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
- // AddToWatch used to add stack to wait its state
- AddToWatch(workers []worker.SyncWorker) error
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
- // GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (worker.SyncWorker, error)
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
- // PutWorker enqueues worker back
- PushWorker(w worker.SyncWorker)
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
- // AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew() error
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
- // Destroy destroys the underlying stack
+ // Destroy destroys the underlying container
Destroy(ctx context.Context)
- // WorkersList return all stack w/o removing it from internal storage
- WorkersList() []worker.SyncWorker
+ // WorkersList return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
- // RemoveWorker remove worker from the stack
- RemoveWorker(wb worker.SyncWorker) error
+ // RemoveWorker remove worker from the container
+ Remove(wb worker.BaseProcess)
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cf2e1eb7..804e4658 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,132 +3,228 @@ package worker_watcher //nolint:golint,stylecheck
import (
"context"
"sync"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+// workerCreateFunc can be nil, but in that case, dead container will not be replaced
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(numWorkers),
- allocator: allocator,
- events: events,
+ container: container.NewVector(numWorkers),
+ numWorkers: numWorkers,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+ allocator: allocator,
+ events: events,
}
return ww
}
type workerWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator worker.Allocator
- events events.Handler
+ sync.RWMutex
+ container container.Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+ workers []worker.BaseProcess
+ allocator worker.Allocator
+ events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.stack.Push(workers[i])
+ ww.container.Enqueue(workers[i])
+ // add worker to watch slice
+ ww.workers = append(ww.workers, workers[i])
- go func(swc worker.SyncWorker) {
+ go func(swc worker.BaseProcess) {
ww.wait(swc)
}(workers[i])
}
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
+// return value from Get
+type get struct {
+ w worker.BaseProcess
+ err error
+}
+
+// Get is not a thread safe operation
+func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+ c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- // thread safe operation
- w, stop := ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
+ go func() {
+ // FAST PATH
+ // thread safe operation
+ w, stop := ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
+ return
+ }
- // handle worker remove state
- // in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == states.StateRemove {
- err := ww.RemoveWorker(w)
- if err != nil {
- return nil, err
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ c <- get{
+ w,
+ nil,
+ }
+ return
}
- // try to get next
- return ww.GetFreeWorker(ctx)
- }
- // no free stack
- if w == nil {
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill()
+ // no free workers in the container
+ // try to continuously get free one
for {
select {
default:
- w, stop = ww.stack.Pop()
+ w, stop = ww.container.Dequeue()
if stop {
- return nil, errors.E(op, errors.WatcherStopped)
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
}
- if w == nil {
+
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
+ }
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
continue
}
- return w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
- }
+ }()
- return w, nil
+ select {
+ case r := <-c:
+ if r.err != nil {
+ return nil, r.err
+ }
+ return r.w, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
+ }
}
-func (ww *workerWatcher) AllocateNew() error {
- ww.stack.mutex.Lock()
+func (ww *workerWatcher) Allocate() error {
+ ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
return errors.E(op, errors.WorkerAllocate, err)
}
+ // add worker to Wait
ww.addToWatch(sw)
- ww.stack.mutex.Unlock()
- ww.PushWorker(sw)
+ // add new worker to the workers slice (to get information about workers in parallel)
+ ww.workers = append(ww.workers, sw)
+
+ // unlock Allocate mutex
+ ww.Unlock()
+ // push the worker to the container
+ ww.Push(sw)
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
+// Remove
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
+ ww.Lock()
+ defer ww.Unlock()
- const op = errors.Op("worker_watcher_remove_worker")
+ // set remove state
pid := wb.Pid()
- if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(states.StateRemove)
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
+ // worker will be removed on the Get operation
+ for i := 0; i < len(ww.workers); i++ {
+ if ww.workers[i].Pid() == pid {
+ ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
+ // kill worker
+ _ = wb.Kill()
+ return
}
- return nil
}
-
- return nil
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
- ww.stack.Push(w)
+func (ww *workerWatcher) Push(w worker.BaseProcess) {
+ ww.container.Enqueue(w)
}
-// Destroy all underlying stack (but let them to complete the task)
+// Destroy all underlying container (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- // destroy stack, we don't use ww mutex here, since we should be able to push worker
- ww.stack.Destroy(ctx)
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 100)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.Lock()
+ // that might be one of the workers is working
+ if ww.numWorkers != uint64(len(ww.workers)) {
+ ww.Unlock()
+ continue
+ }
+ ww.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+ return
+ }
+ }
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
- return ww.stack.Workers()
+func (ww *workerWatcher) List() []worker.BaseProcess {
+ ww.Lock()
+ defer ww.Unlock()
+
+ base := make([]worker.BaseProcess, 0, len(ww.workers))
+ for i := 0; i < len(ww.workers); i++ {
+ base = append(base, ww.workers[i])
+ }
+
+ return base
}
func (ww *workerWatcher) wait(w worker.BaseProcess) {
@@ -142,14 +238,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
- _ = ww.stack.FindAndRemoveByPid(w.Pid())
- err = ww.AllocateNew()
+ ww.Remove(w)
+ err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
Event: events.EventPoolError,
@@ -158,7 +254,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
go func() {
ww.wait(wb)
}()