summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-10 20:00:22 +0300
committerValery Piashchynski <[email protected]>2021-02-10 20:00:22 +0300
commitae3dd0c3672217be0b3fb4042ef650477fba108b (patch)
treeda5b08308e5aff50a102f41e254ee3620d41550e /pkg/worker_watcher/worker_watcher.go
parentda64d9fbab7d73e203e7dbbb9503f4d422feaab0 (diff)
Rewrite container for the workers
Update tests
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go240
1 files changed, 153 insertions, 87 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 2380c190..3e0633a3 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,33 +3,42 @@ package worker_watcher //nolint:golint,stylecheck
import (
"context"
"sync"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
-// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
+// workerCreateFunc can be nil, but in that case, dead container will not be replaced
func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(numWorkers),
- allocator: allocator,
- events: events,
+ container: container.NewVector(numWorkers),
+ numWorkers: numWorkers,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
+ allocator: allocator,
+ events: events,
}
return ww
}
type workerWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator worker.Allocator
- events events.Handler
+ sync.RWMutex
+ container container.Vector
+ // used to control the Destroy stage (that all workers are in the container)
+ numWorkers uint64
+ workers []worker.BaseProcess
+ allocator worker.Allocator
+ events events.Handler
}
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- ww.stack.Push(workers[i])
+ ww.container.Enqueue(workers[i])
+ // add worker to watch slice
+ ww.workers = append(ww.workers, workers[i])
go func(swc worker.BaseProcess) {
ww.wait(swc)
@@ -38,75 +47,96 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
+// return value from Get
+type get struct {
+ w worker.BaseProcess
+ err error
+}
+
// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
+ c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- // FAST PATH
- // thread safe operation
- w, stop := ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
-
- // fast path, worker not nil and in the ReadyState
- if w != nil && w.State().Value() == worker.StateReady {
- return w, nil
- }
- // =========================================================
- // SLOW PATH
- // Put worker back (no matter it's state, it will be killed next)
- if w != nil {
- ww.stack.Push(w)
- }
- // no free workers in the stack
- // try to continuously get free one
- for {
- select {
- default:
- w, stop = ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
+ go func() {
+ // FAST PATH
+ // thread safe operation
+ w, stop := ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
}
- if w == nil {
- continue
+ return
+ }
+
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ c <- get{
+ w,
+ nil,
}
+ return
+ }
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill()
+ // no free workers in the container
+ // try to continuously get free one
+ for {
+ select {
+ default:
+ w, stop = ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
+ }
+ }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- return w, nil
- case worker.StateRemove:
- err := ww.Remove(w)
- if err != nil {
- return nil, errors.E(op, err)
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
+ }
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w)
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
- // try to get next
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateWorking, // ??? how
- worker.StateStopping:
- // worker doing no work because it in the stack
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
}
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
+ }()
+
+ select {
+ case r := <-c:
+ if r.err != nil {
+ return nil, r.err
+ }
+ return r.w, nil
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
}
}
func (ww *workerWatcher) Allocate() error {
- ww.mutex.Lock()
+ ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
@@ -114,47 +144,83 @@ func (ww *workerWatcher) Allocate() error {
}
ww.addToWatch(sw)
- ww.mutex.Unlock()
- ww.Push(sw)
+ ww.workers = append(ww.workers, sw)
+
+ ww.Unlock()
+ ww.Push(sw)
return nil
}
// Remove
-func (ww *workerWatcher) Remove(wb worker.BaseProcess) error {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
+ ww.Lock()
+ defer ww.Unlock()
- const op = errors.Op("worker_watcher_remove_worker")
// set remove state
- wb.State().Set(worker.StateRemove)
- if ww.stack.FindAndRemoveByPid(wb.Pid()) {
- err := wb.Kill()
- if err != nil {
- return errors.E(op, err)
+ pid := wb.Pid()
+
+ // worker will be removed on the Get operation
+ for i := 0; i < len(ww.workers); i++ {
+ if ww.workers[i].Pid() == pid {
+ ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
+ // kill worker
+ _ = wb.Kill()
+ return
}
- return nil
}
-
- return nil
}
// O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- ww.mutex.Lock()
- defer ww.mutex.Unlock()
- ww.stack.Push(w)
+ ww.container.Enqueue(w)
}
-// Destroy all underlying stack (but let them to complete the task)
+// Destroy all underlying container (but let them to complete the task)
func (ww *workerWatcher) Destroy(ctx context.Context) {
- // destroy stack, we don't use ww mutex here, since we should be able to push worker
- ww.stack.Destroy(ctx)
+ // destroy container, we don't use ww mutex here, since we should be able to push worker
+ ww.Lock()
+ // do not release new workers
+ ww.container.Destroy()
+ ww.Unlock()
+
+ tt := time.NewTicker(time.Millisecond * 500)
+ defer tt.Stop()
+ for {
+ select {
+ case <-tt.C:
+ ww.Lock()
+ // that might be one of the workers is working
+ if ww.numWorkers != uint64(len(ww.workers)) {
+ ww.Unlock()
+ continue
+ }
+ ww.Unlock()
+ // unnecessary mutex, but
+ // just to make sure. All container at this moment are in the container
+ // Pop operation is blocked, push can't be done, since it's not possible to pop
+ ww.Lock()
+ for i := 0; i < len(ww.workers); i++ {
+ ww.workers[i].State().Set(worker.StateDestroyed)
+ // kill the worker
+ _ = ww.workers[i].Kill()
+ }
+ return
+ }
+ }
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
func (ww *workerWatcher) List() []worker.BaseProcess {
- return ww.stack.Workers()
+ ww.Lock()
+ defer ww.Unlock()
+
+ base := make([]worker.BaseProcess, 0, len(ww.workers))
+ for i := 0; i < len(ww.workers); i++ {
+ base = append(base, ww.workers[i])
+ }
+
+ return base
}
func (ww *workerWatcher) wait(w worker.BaseProcess) {
@@ -174,7 +240,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
return
}
- _ = ww.stack.FindAndRemoveByPid(w.Pid())
+ ww.Remove(w)
err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{