summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-05 01:07:26 +0300
committerValery Piashchynski <[email protected]>2021-02-05 01:07:26 +0300
commit5e012c6f2c822858b63638325804524250992a42 (patch)
tree6832f8c5079c098d001792071b03d5ca23f22374 /pkg/worker_watcher
parentd629f08408a4478aaba90079a4e37ab69cfc12ef (diff)
handle worker state before sending to the exec
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/stack_test.go2
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go53
2 files changed, 34 insertions, 21 deletions
diff --git a/pkg/worker_watcher/stack_test.go b/pkg/worker_watcher/stack_test.go
index 5287a6dc..5fc45adc 100644
--- a/pkg/worker_watcher/stack_test.go
+++ b/pkg/worker_watcher/stack_test.go
@@ -140,3 +140,5 @@ func TestStack_DestroyWithWait(t *testing.T) {
stack.Destroy(context.Background())
assert.Equal(t, uint64(0), stack.actualNumOfWorkers)
}
+
+
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 6b9e9dbf..93db7317 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -48,26 +48,37 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == worker.StateRemove {
- err := ww.Remove(w)
- if err != nil {
- return nil, err
- }
- // try to get next
- return ww.Get(ctx)
- }
-
- // if worker not in the ready state it possibly corrupted
- if w != nil && w.State().Value() != worker.StateReady {
- err := ww.Remove(w)
- if err != nil {
- return nil, err
+ if w != nil {
+ switch w.State().Value() {
+ case worker.StateRemove:
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.Get(ctx)
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateWorking, // ??? how
+ worker.StateStopping:
+ // worker doing no work because it in the stack
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // and recursively try to get the next worker
+ return ww.Get(ctx)
+ // return only workers in the Ready state
+ case worker.StateReady:
+ return w, nil
}
- // try to get next
- return ww.Get(ctx)
}
- // no free stack
+ // no free workers in the stack
if w == nil {
for {
select {
@@ -104,15 +115,15 @@ func (ww *workerWatcher) Allocate() error {
return nil
}
+// Remove
func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
const op = errors.Op("worker_watcher_remove_worker")
- pid := wb.Pid()
-
- if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(worker.StateRemove)
+ // set remove state
+ wb.State().Set(worker.StateRemove)
+ if ww.stack.FindAndRemoveByPid(wb.Pid()) {
err := wb.Kill()
if err != nil {
return errors.E(op, err)