diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 90 |
1 files changed, 45 insertions, 45 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 93db7317..1e229d9d 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { return nil } +// Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") + // FAST PATH // thread safe operation w, stop := ww.stack.Pop() if stop { return nil, errors.E(op, errors.WatcherStopped) } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil { - switch w.State().Value() { - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, err - } - // try to get next - return ww.Get(ctx) - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // and recursively try to get the next worker - return ww.Get(ctx) - // return only workers in the Ready state - case worker.StateReady: - return w, nil - } + // fast path, worker not nil and in the ReadyState + if w != nil && w.State().Value() == worker.StateReady { + return w, nil } - + // ========================================================= + // SLOW PATH // no free workers in the stack - if w == nil { - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - if w == nil { - continue + // try to continuously get free one + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + if w == nil { + continue + } + + switch w.State().Value() { + case worker.StateRemove: + err := ww.Remove(w) + if err != nil { + return nil, errors.E(op, err) } + // try to get next + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateWorking, // ??? how + worker.StateStopping: + // worker doing no work because it in the stack + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue + // return only workers in the Ready state + case worker.StateReady: return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - - return w, nil } func (ww *workerWatcher) Allocate() error { |