diff options
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 1 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 90 | ||||
-rwxr-xr-x | rr | bin | 0 -> 32768 bytes |
4 files changed, 46 insertions, 47 deletions
@@ -536,8 +536,6 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta.22 h1:zOhrQ49DeYfr1rOHfUy573pjWpyxkG30vVz4o0ejvaQ= -github.com/spiral/endure v1.0.0-beta.22/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= github.com/spiral/endure v1.0.0-beta.23 h1:iIK+lrOTaWUyJpENxvjNjlhBA0QIrhks1uxcza3bmUQ= github.com/spiral/endure v1.0.0-beta.23/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE= github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 51c3d016..9a0bc6a4 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -89,6 +89,7 @@ func (stack *Stack) Workers() []worker.SyncWorker { defer stack.mutex.Unlock() workersCopy := make([]worker.SyncWorker, 0, 1) // copy + // TODO pointers, copy have no sense for _, v := range stack.workers { if v != nil { workersCopy = append(workersCopy, v) diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 93db7317..1e229d9d 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { return nil } +// Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") + // FAST PATH // thread safe operation w, stop := ww.stack.Pop() if stop { return nil, errors.E(op, errors.WatcherStopped) } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil { - switch w.State().Value() { - case worker.StateRemove: - err := ww.Remove(w) - if err != nil { - return nil, err - } - // try to get next - return ww.Get(ctx) - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateWorking, // ??? how - worker.StateStopping: - // worker doing no work because it in the stack - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // and recursively try to get the next worker - return ww.Get(ctx) - // return only workers in the Ready state - case worker.StateReady: - return w, nil - } + // fast path, worker not nil and in the ReadyState + if w != nil && w.State().Value() == worker.StateReady { + return w, nil } - + // ========================================================= + // SLOW PATH // no free workers in the stack - if w == nil { - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, errors.E(op, errors.WatcherStopped) - } - if w == nil { - continue + // try to continuously get free one + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, errors.E(op, errors.WatcherStopped) + } + if w == nil { + continue + } + + switch w.State().Value() { + case worker.StateRemove: + err := ww.Remove(w) + if err != nil { + return nil, errors.E(op, err) } + // try to get next + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateWorking, // ??? how + worker.StateStopping: + // worker doing no work because it in the stack + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue + // return only workers in the Ready state + case worker.StateReady: return w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } - - return w, nil } func (ww *workerWatcher) Allocate() error { Binary files differ |