diff options
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cf2e1eb7..6b9e9dbf 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -6,7 +6,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -28,7 +27,7 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) @@ -39,7 +38,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { +func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -49,14 +48,25 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == states.StateRemove { - err := ww.RemoveWorker(w) + if w != nil && w.State().Value() == worker.StateRemove { + err := ww.Remove(w) if err != nil { return nil, err } // try to get next - return ww.GetFreeWorker(ctx) + return ww.Get(ctx) } + + // if worker not in the ready state it possibly corrupted + if w != nil && w.State().Value() != worker.StateReady { + err := ww.Remove(w) + if err != nil { + return nil, err + } + // try to get next + return ww.Get(ctx) + } + // no free stack if w == nil { for { @@ -79,7 +89,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, return w, nil } -func (ww *workerWatcher) AllocateNew() error { +func (ww *workerWatcher) Allocate() error { ww.stack.mutex.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() @@ -89,12 +99,12 @@ func (ww *workerWatcher) AllocateNew() error { ww.addToWatch(sw) ww.stack.mutex.Unlock() - ww.PushWorker(sw) + ww.Push(sw) return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { +func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -102,7 +112,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(states.StateRemove) + wb.State().Set(worker.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -114,7 +124,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { +func (ww *workerWatcher) Push(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,7 +137,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.SyncWorker { +func (ww *workerWatcher) List() []worker.SyncWorker { return ww.stack.Workers() } @@ -142,14 +152,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } _ = ww.stack.FindAndRemoveByPid(w.Pid()) - err = ww.AllocateNew() + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, |