summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go73
1 files changed, 37 insertions, 36 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cc8cc2b6..a6dfe43e 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -79,47 +79,44 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
}
// =========================================================
// SLOW PATH
- _ = w.Kill()
+ _ = w.Kill() // how the worker get here???????
// no free workers in the container
// try to continuously get free one
for {
- select {
- default:
- w, stop = ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
+ w, stop = ww.container.Dequeue()
+ if stop {
+ c <- get{
+ nil,
+ errors.E(op, errors.WatcherStopped),
}
+ }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- c <- get{
- w,
- nil,
- }
- return
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ c <- get{
+ w,
+ nil,
}
+ return
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
}
}()
@@ -177,6 +174,10 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
+ if w.State().Value() != worker.StateReady {
+ _ = w.Kill()
+ return
+ }
ww.container.Enqueue(w)
}
@@ -190,7 +191,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
tt := time.NewTicker(time.Millisecond * 100)
defer tt.Stop()
- for {
+ for { //nolint:gosimple
select {
case <-tt.C:
ww.Lock()