summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.sum2
-rw-r--r--pkg/worker_watcher/stack.go1
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go90
-rwxr-xr-xrrbin0 -> 32768 bytes
4 files changed, 46 insertions, 47 deletions
diff --git a/go.sum b/go.sum
index 6789dfe3..53d3b82f 100644
--- a/go.sum
+++ b/go.sum
@@ -536,8 +536,6 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
-github.com/spiral/endure v1.0.0-beta.22 h1:zOhrQ49DeYfr1rOHfUy573pjWpyxkG30vVz4o0ejvaQ=
-github.com/spiral/endure v1.0.0-beta.22/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE=
github.com/spiral/endure v1.0.0-beta.23 h1:iIK+lrOTaWUyJpENxvjNjlhBA0QIrhks1uxcza3bmUQ=
github.com/spiral/endure v1.0.0-beta.23/go.mod h1:+gB0/jI9tXdHgv0x4P9vXLER8fLgwt9a7aPi0QZeJHE=
github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 51c3d016..9a0bc6a4 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -89,6 +89,7 @@ func (stack *Stack) Workers() []worker.SyncWorker {
defer stack.mutex.Unlock()
workersCopy := make([]worker.SyncWorker, 0, 1)
// copy
+ // TODO pointers, copy have no sense
for _, v := range stack.workers {
if v != nil {
workersCopy = append(workersCopy, v)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 93db7317..1e229d9d 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -38,66 +38,66 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
return nil
}
+// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
+ // FAST PATH
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
return nil, errors.E(op, errors.WatcherStopped)
}
- // handle worker remove state
- // in this state worker is destroyed by supervisor
- if w != nil {
- switch w.State().Value() {
- case worker.StateRemove:
- err := ww.Remove(w)
- if err != nil {
- return nil, err
- }
- // try to get next
- return ww.Get(ctx)
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateWorking, // ??? how
- worker.StateStopping:
- // worker doing no work because it in the stack
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // and recursively try to get the next worker
- return ww.Get(ctx)
- // return only workers in the Ready state
- case worker.StateReady:
- return w, nil
- }
+ // fast path, worker not nil and in the ReadyState
+ if w != nil && w.State().Value() == worker.StateReady {
+ return w, nil
}
-
+ // =========================================================
+ // SLOW PATH
// no free workers in the stack
- if w == nil {
- for {
- select {
- default:
- w, stop = ww.stack.Pop()
- if stop {
- return nil, errors.E(op, errors.WatcherStopped)
- }
- if w == nil {
- continue
+ // try to continuously get free one
+ for {
+ select {
+ default:
+ w, stop = ww.stack.Pop()
+ if stop {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+ if w == nil {
+ continue
+ }
+
+ switch w.State().Value() {
+ case worker.StateRemove:
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, errors.E(op, err)
}
+ // try to get next
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateWorking, // ??? how
+ worker.StateStopping:
+ // worker doing no work because it in the stack
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
+ // return only workers in the Ready state
+ case worker.StateReady:
return w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
+ case <-ctx.Done():
+ return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
}
}
-
- return w, nil
}
func (ww *workerWatcher) Allocate() error {
diff --git a/rr b/rr
new file mode 100755
index 00000000..8a7ac396
--- /dev/null
+++ b/rr
Binary files differ