summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-08 00:23:14 +0300
committerValery Piashchynski <[email protected]>2021-02-08 00:23:14 +0300
commitd353bc5f0be991ad44208e48ed04dc61ee53c340 (patch)
tree4ac78a7219e03aaf31af042581109dc3c9ab84f6 /pkg/worker_watcher/worker_watcher.go
parentae8af5413143636d5fe52ddaffa5d9122681bc20 (diff)
Rewrite stack.Get operation w/o recursion calls
Add fast and slow paths
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go90
1 files changed, 45 insertions, 45 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 93db7317..1e229d9d 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
return nil
}
+// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
+ // FAST PATH
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
return nil, errors.E(op, errors.WatcherStopped)
}
- // handle worker remove state
- // in this state worker is destroyed by supervisor
- if w != nil {
- switch w.State().Value() {
- case worker.StateRemove:
- err := ww.Remove(w)
- if err != nil {
- return nil, err
- }
- // try to get next
- return ww.Get(ctx)
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateWorking, // ??? how
- worker.StateStopping:
- // worker doing no work because it in the stack
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // and recursively try to get the next worker
- return ww.Get(ctx)
- // return only workers in the Ready state
- case worker.StateReady:
- return w, nil
- }
+ // fast path, worker not nil and in the ReadyState
+ if w != nil && w.State().Value() == worker.StateReady {
+ return w, nil
}
-
+ // =========================================================
+ // SLOW PATH
// no free workers in the stack
- if w == nil {
- for {
- select {
- default:
- w, stop = ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
- if w == nil {
- continue
+ // try to continuously get free one
+ for {
+ select {
+ default:
+ w, stop = ww.stack.Pop()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ if w == nil {
+ continue
+ }
+
+ switch w.State().Value() {
+ case worker.StateRemove:
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, errors.E(op, err)
}
+ // try to get next
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateWorking, // ??? how
+ worker.StateStopping:
+ // worker doing no work because it in the stack
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
+ // return only workers in the Ready state
+ case worker.StateReady:
return w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
-
- return w, nil
}
func (ww *workerWatcher) Allocate() error {