summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/worker_watcher/stack.go1
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go90
2 files changed, 46 insertions, 45 deletions
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 51c3d016..9a0bc6a4 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -89,6 +89,7 @@ func (stack *Stack) Workers() []worker.SyncWorker {
defer stack.mutex.Unlock()
workersCopy := make([]worker.SyncWorker, 0, 1)
// copy
+ // TODO pointers, copy have no sense
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 93db7317..1e229d9d 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
return nil
}
+// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
+ // FAST PATH
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
return nil, errors.E(op, errors.WatcherStopped)
}
- // handle worker remove state
- // in this state worker is destroyed by supervisor
- if w != nil {
- switch w.State().Value() {
- case worker.StateRemove:
- err := ww.Remove(w)
- if err != nil {
- return nil, err
- }
- // try to get next
- return ww.Get(ctx)
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateWorking, // ??? how
- worker.StateStopping:
- // worker doing no work because it in the stack
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // and recursively try to get the next worker
- return ww.Get(ctx)
- // return only workers in the Ready state
- case worker.StateReady:
- return w, nil
- }
+ // fast path, worker not nil and in the ReadyState
+ if w != nil && w.State().Value() == worker.StateReady {
+ return w, nil
}
-
+ // =========================================================
+ // SLOW PATH
// no free workers in the stack
- if w == nil {
- for {
- select {
- default:
- w, stop = ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
- if w == nil {
- continue
+ // try to continuously get free one
+ for {
+ select {
+ default:
+ w, stop = ww.stack.Pop()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ if w == nil {
+ continue
+ }
+
+ switch w.State().Value() {
+ case worker.StateRemove:
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, errors.E(op, err)
}
+ // try to get next
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateWorking, // ??? how
+ worker.StateStopping:
+ // worker doing no work because it in the stack
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
+ // return only workers in the Ready state
+ case worker.StateReady:
return w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
-
- return w, nil
}
func (ww *workerWatcher) Allocate() error {