diff options
author | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-08 23:21:54 +0300 |
commit | da64d9fbab7d73e203e7dbbb9503f4d422feaab0 (patch) | |
tree | 3dc3d5dd4a8c4de7d4b57baf2eeb1089f831bc1c /pkg/worker_watcher/worker_watcher.go | |
parent | 3e92e3df723ca1c4f152d8526eebfd7184e6fcec (diff) |
BaseProcess interface as a return type in the worker_watcher,pool and
worker interface
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index d065bae5..2380c190 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -27,11 +27,11 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.SyncWorker) { + go func(swc worker.BaseProcess) { ww.wait(swc) }(workers[i]) } @@ -39,7 +39,7 @@ func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { } // Get is not a thread safe operation -func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { +func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { const op = errors.Op("worker_watcher_get_free_worker") // FAST PATH // thread safe operation @@ -72,6 +72,10 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { } switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil case worker.StateRemove: err := ww.Remove(w) if err != nil { @@ -94,9 +98,6 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { _ = w.Kill() // try to get new worker continue - // return only workers in the Ready state - case worker.StateReady: - return w, nil } case <-ctx.Done(): return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) @@ -105,7 +106,7 @@ func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { } func (ww *workerWatcher) Allocate() error { - ww.stack.mutex.Lock() + ww.mutex.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() if err != nil { @@ -113,14 +114,14 @@ func (ww *workerWatcher) Allocate() error { } ww.addToWatch(sw) - ww.stack.mutex.Unlock() + ww.mutex.Unlock() ww.Push(sw) return nil } // Remove -func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { +func (ww *workerWatcher) Remove(wb worker.BaseProcess) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -139,7 +140,7 @@ func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { } // O(1) operation -func (ww *workerWatcher) Push(w worker.SyncWorker) { +func (ww *workerWatcher) Push(w worker.BaseProcess) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -152,7 +153,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) List() []worker.SyncWorker { +func (ww *workerWatcher) List() []worker.BaseProcess { return ww.stack.Workers() } @@ -183,7 +184,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { +func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { go func() { ww.wait(wb) }() |