diff options
author | Valery Piashchynski <[email protected]> | 2021-06-26 00:10:45 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-26 00:10:45 +0300 |
commit | 7c0b63a9d0cee75e8bdedd2dcd0c20139f3ffd3f (patch) | |
tree | dbc09a665fb75d51f4b01a83cd80b7f2ddab0aae /pkg | |
parent | ae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (diff) |
- Better channel handling
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 4 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 21 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 105 |
3 files changed, 64 insertions, 66 deletions
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index 0e648d34..e10ecdae 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -1,6 +1,8 @@ package container import ( + "context" + "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -9,7 +11,7 @@ type Vector interface { // Enqueue used to put worker to the vector Enqueue(worker.BaseProcess) // Dequeue used to get worker from the vector - Dequeue() (worker.BaseProcess, bool) + Dequeue(ctx context.Context) (worker.BaseProcess, error) // Destroy used to stop releasing the workers Destroy() } diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 1ab9d073..8072af10 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -1,20 +1,20 @@ package container import ( + "context" "sync/atomic" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/worker" ) type Vec struct { - wqLen uint64 destroy uint64 workers chan worker.BaseProcess } func NewVector(initialNumOfWorkers uint64) Vector { vec := &Vec{ - wqLen: 0, destroy: 0, workers: make(chan worker.BaseProcess, initialNumOfWorkers), } @@ -23,11 +23,10 @@ func NewVector(initialNumOfWorkers uint64) Vector { } func (v *Vec) Enqueue(w worker.BaseProcess) { - atomic.AddUint64(&v.wqLen, 1) v.workers <- w } -func (v *Vec) Dequeue() (worker.BaseProcess, bool) { +func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new @@ -36,15 +35,17 @@ func (v *Vec) Dequeue() (worker.BaseProcess, bool) { */ if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, true + return nil, errors.E(errors.WatcherStopped) } - if num := atomic.LoadUint64(&v.wqLen); num > 0 { - atomic.AddUint64(&v.wqLen, ^uint64(0)) - return <-v.workers, false + for { + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } } - - return nil, false } func (v *Vec) Destroy() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 9d66a75c..f82de958 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -51,64 +51,59 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") + // thread safe operation + w, err := ww.container.Dequeue(ctx) + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + + if err != nil { + return nil, errors.E(op, err) + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + return w, nil + } + + // ========================================================= + // SLOW PATH + _ = w.Kill() // how the worker get here??????? + // no free workers in the container + // try to continuously get free one for { - select { - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers) - default: - // thread safe operation - w, stop := ww.container.Dequeue() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } + w, err = ww.container.Dequeue(ctx) - if w == nil { - continue - } + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - return w, nil - } - // ========================================================= - // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container - // try to continuously get free one - for { - w, stop = ww.container.Dequeue() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - - if w == nil { - continue - } - - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - return w, nil - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue - } - } + if err != nil { + return nil, errors.E(op, err) + } + + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } } } |