diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 122 |
1 files changed, 49 insertions, 73 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 108756fc..f82de958 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -47,88 +47,64 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// return value from Get -type get struct { - w worker.BaseProcess - err error -} - // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { - c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - go func() { - // FAST PATH - // thread safe operation - w, stop := ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } - return - } - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - c <- get{ - w, - nil, - } - return + // thread safe operation + w, err := ww.container.Dequeue(ctx) + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + + if err != nil { + return nil, errors.E(op, err) + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + return w, nil + } + + // ========================================================= + // SLOW PATH + _ = w.Kill() // how the worker get here??????? + // no free workers in the container + // try to continuously get free one + for { + w, err = ww.container.Dequeue(ctx) + + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) } - // ========================================================= - // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container - // try to continuously get free one - for { - w, stop = ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } - } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue - } + if err != nil { + return nil, errors.E(op, err) } - }() - select { - case r := <-c: - if r.err != nil { - return nil, r.err + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } - return r.w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } |