diff options
author | Valery Piashchynski <[email protected]> | 2021-02-05 01:07:26 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-05 01:07:26 +0300 |
commit | 5e012c6f2c822858b63638325804524250992a42 (patch) | |
tree | 6832f8c5079c098d001792071b03d5ca23f22374 /pkg/worker_watcher | |
parent | d629f08408a4478aaba90079a4e37ab69cfc12ef (diff) |
handle worker state before sending to the exec
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/stack_test.go | 2 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 53 |
2 files changed, 34 insertions, 21 deletions
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go index 5287a6dc..5fc45adc 100644 --- a/pkg/worker_watcher/stack_test.go +++ b/pkg/worker_watcher/stack_test.go @@ -140,3 +140,5 @@ func TestStack_DestroyWithWait(t *testing.T) { stack.Destroy(context.Background()) assert.Equal(t, uint64(0), stack.actualNumOfWorkers) } + + diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 6b9e9dbf..93db7317 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -48,26 +48,37 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == worker.StateRemove { - err := ww.Remove(w) - if err != nil { - return nil, err - } - // try to get next - return ww.Get(ctx) - } - - // if worker not in the ready state it possibly corrupted - if w != nil && w.State().Value() != worker.StateReady { - err := ww.Remove(w) - if err != nil { - return nil, err + if w != nil { + switch w.State().Value() { + case worker.StateRemove: + err := ww.Remove(w) + if err != nil { + return nil, err + } + // try to get next + return ww.Get(ctx) + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateWorking, // ??? how + worker.StateStopping: + // worker doing no work because it in the stack + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // and recursively try to get the next worker + return ww.Get(ctx) + // return only workers in the Ready state + case worker.StateReady: + return w, nil } - // try to get next - return ww.Get(ctx) } - // no free stack + // no free workers in the stack if w == nil { for { select { @@ -104,15 +115,15 @@ func (ww *workerWatcher) Allocate() error { return nil } +// Remove func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() const op = errors.Op("worker_watcher_remove_worker") - pid := wb.Pid() - - if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(worker.StateRemove) + // set remove state + wb.State().Set(worker.StateRemove) + if ww.stack.FindAndRemoveByPid(wb.Pid()) { err := wb.Kill() if err != nil { return errors.E(op, err) |