summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go25
1 files changed, 13 insertions, 12 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index d065bae5..2380c190 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -27,11 +27,11 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
- go func(swc worker.SyncWorker) {
+ go func(swc worker.BaseProcess) {
ww.wait(swc)
}(workers[i])
}
@@ -39,7 +39,7 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
}
// Get is not a thread safe operation
-func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
+func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// FAST PATH
// thread safe operation
@@ -72,6 +72,10 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
}
switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
case worker.StateRemove:
err := ww.Remove(w)
if err != nil {
@@ -94,9 +98,6 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
_ = w.Kill()
// try to get new worker
continue
- // return only workers in the Ready state
- case worker.StateReady:
- return w, nil
}
case <-ctx.Done():
return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed"))
@@ -105,7 +106,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.stack.mutex.Lock()
+ ww.mutex.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
if err != nil {
@@ -113,14 +114,14 @@ func (ww *workerWatcher) Allocate() error {
}
ww.addToWatch(sw)
- ww.stack.mutex.Unlock()
+ ww.mutex.Unlock()
ww.Push(sw)
return nil
}
// Remove
-func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
+func (ww *workerWatcher) Remove(wb worker.BaseProcess) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -139,7 +140,7 @@ func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
}
// O(1) operation
-func (ww *workerWatcher) Push(w worker.SyncWorker) {
+func (ww *workerWatcher) Push(w worker.BaseProcess) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -152,7 +153,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) List() []worker.SyncWorker {
+func (ww *workerWatcher) List() []worker.BaseProcess {
return ww.stack.Workers()
}
@@ -183,7 +184,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
}
}
-func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) {
+func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) {
go func() {
ww.wait(wb)
}()