diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 73 |
1 files changed, 37 insertions, 36 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cc8cc2b6..a6dfe43e 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { } // ========================================================= // SLOW PATH - _ = w.Kill() + _ = w.Kill() // how the worker get here??????? // no free workers in the container // try to continuously get free one for { - select { - default: - w, stop = ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } + w, stop = ww.container.Dequeue() + if stop { + c <- get{ + nil, + errors.E(op, errors.WatcherStopped), } + } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + c <- get{ + w, + nil, } + return + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } } }() @@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) { // O(1) operation func (ww *workerWatcher) Push(w worker.BaseProcess) { + if w.State().Value() != worker.StateReady { + _ = w.Kill() + return + } ww.container.Enqueue(w) } @@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { tt := time.NewTicker(time.Millisecond * 100) defer tt.Stop() - for { + for { //nolint:gosimple select { case <-tt.C: ww.Lock() |