summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-25 22:07:52 +0300
committerValery Piashchynski <[email protected]>2021-06-25 22:07:52 +0300
commitae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (patch)
tree8d14a41607080298e1f0441a9ad620aee17da39e /pkg/worker_watcher/worker_watcher.go
parente9249c7896331bab97a18a7ee0db17803fdd31fb (diff)
- Fix bug with an exec_ttl reallocation issue
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go123
1 files changed, 52 insertions, 71 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 108756fc..9d66a75c 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -47,88 +47,69 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// return value from Get
-type get struct {
- w worker.BaseProcess
- err error
-}
-
// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
- c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- go func() {
- // FAST PATH
- // thread safe operation
- w, stop := ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
- return
- }
- // fast path, worker not nil and in the ReadyState
- if w.State().Value() == worker.StateReady {
- c <- get{
- w,
- nil,
- }
- return
- }
- // =========================================================
- // SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
- // try to continuously get free one
- for {
- w, stop = ww.container.Dequeue()
+ for {
+ select {
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers)
+ default:
+ // thread safe operation
+ w, stop := ww.container.Dequeue()
if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
+ return nil, errors.E(op, errors.WatcherStopped)
}
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- c <- get{
- w,
- nil,
- }
- return
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
+ if w == nil {
continue
}
- }
- }()
- select {
- case r := <-c:
- if r.err != nil {
- return nil, r.err
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ return w, nil
+ }
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill() // how the worker get here???????
+ // no free workers in the container
+ // try to continuously get free one
+ for {
+ w, stop = ww.container.Dequeue()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
+ if w == nil {
+ continue
+ }
+
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
+ }
+ }
}
- return r.w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
}
}