diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index b0d39165..f87bd021 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,13 +5,13 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/worker" ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) worker.Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { ww := &workerWatcher{ stack: NewWorkersStack(uint64(numWorkers)), allocator: allocator, @@ -28,18 +28,18 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error { +func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) - go func(swc worker.BaseProcess) { + go func(swc worker.SyncWorker) { ww.wait(swc) }(workers[i]) } return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.BaseProcess, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (*worker.SyncWorkerImpl, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -94,7 +94,7 @@ func (ww *workerWatcher) AllocateNew() error { return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { +func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -114,7 +114,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.BaseProcess) { +func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,11 +127,11 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.BaseProcess { +func (ww *workerWatcher) WorkersList() []*worker.SyncWorkerImpl { return ww.stack.Workers() } -func (ww *workerWatcher) wait(w worker.BaseProcess) { +func (ww *workerWatcher) wait(w worker.SyncWorker) { const op = errors.Op("worker_watcher_wait") err := w.Wait() if err != nil { @@ -158,7 +158,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { } } -func (ww *workerWatcher) addToWatch(wb worker.BaseProcess) { +func (ww *workerWatcher) addToWatch(wb worker.SyncWorker) { go func() { ww.wait(wb) }() |