diff options
author | Valery Piashchynski <[email protected]> | 2021-02-08 00:23:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-08 00:23:14 +0300 |
commit | d353bc5f0be991ad44208e48ed04dc61ee53c340 (patch) | |
tree | 4ac78a7219e03aaf31af042581109dc3c9ab84f6 /pkg/worker_watcher | |
parent | ae8af5413143636d5fe52ddaffa5d9122681bc20 (diff) |
Rewrite stack.Get operation w/o recursion calls
Add fast and slow paths
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r-- | pkg/worker_watcher/stack.go | 1 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 90 |
2 files changed, 46 insertions, 45 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 51c3d016..9a0bc6a4 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -89,6 +89,7 @@ func (stack *Stack) Workers() []worker.SyncWorker { defer stack.mutex.Unlock() workersCopy := make([]worker.SyncWorker, 0, 1) // copy + // TODO pointers, copy have no sense for _, v := range stack.workers { if v != nil { workersCopy = append(workersCopy, v) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 93db7317..1e229d9d 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { return nil } +// Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") + // FAST PATH // thread safe operation w, stop := ww.stack.Pop() if stop { return nil, errors.E(op, errors.WatcherStopped) } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil { - switch w.State().Value() { - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, err - } - // try to get next - return ww.Get(ctx) - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // and recursively try to get the next worker - return ww.Get(ctx) - // return only workers in the Ready state - case worker.StateReady: - return w, nil - } + // fast path, worker not nil and in the ReadyState + if w != nil && w.State().Value() == worker.StateReady { + return w, nil } - + // ========================================================= + // SLOW PATH // no free workers in the stack - if w == nil { - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - if w == nil { - continue + // try to continuously get free one + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + if w == nil { + continue + } + + switch w.State().Value() { + case worker.StateRemove: + err := ww.Remove(w) + if err != nil { + return nil, errors.E(op, err) } + // try to get next + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateWorking, // ??? how + worker.StateStopping: + // worker doing no work because it in the stack + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue + // return only workers in the Ready state + case worker.StateReady: return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - - return w, nil } func (ww *workerWatcher) Allocate() error { |