summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-04 20:37:48 +0300
committerValery Piashchynski <[email protected]>2021-02-04 20:37:48 +0300
commitd629f08408a4478aaba90079a4e37ab69cfc12ef (patch)
tree2cb67bc5c9be295428239369e9d211f3888308fe /pkg/worker_watcher/worker_watcher.go
parentefacb852e279e6bbfc076c0faff391ff39815718 (diff)
pre-rc stabilization of the interfaces and internal code
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go38
1 files changed, 24 insertions, 14 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cf2e1eb7..6b9e9dbf 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -6,7 +6,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -28,7 +27,7 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
@@ -39,7 +38,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
+func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -49,14 +48,25 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == states.StateRemove {
- err := ww.RemoveWorker(w)
+ if w != nil && w.State().Value() == worker.StateRemove {
+ err := ww.Remove(w)
if err != nil {
return nil, err
}
// try to get next
- return ww.GetFreeWorker(ctx)
+ return ww.Get(ctx)
}
+
+ // if worker not in the ready state it possibly corrupted
+ if w != nil && w.State().Value() != worker.StateReady {
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.Get(ctx)
+ }
+
// no free stack
if w == nil {
for {
@@ -79,7 +89,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
return w, nil
}
-func (ww *workerWatcher) AllocateNew() error {
+func (ww *workerWatcher) Allocate() error {
ww.stack.mutex.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
@@ -89,12 +99,12 @@ func (ww *workerWatcher) AllocateNew() error {
ww.addToWatch(sw)
ww.stack.mutex.Unlock()
- ww.PushWorker(sw)
+ ww.Push(sw)
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
+func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -102,7 +112,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(states.StateRemove)
+ wb.State().Set(worker.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -114,7 +124,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
+func (ww *workerWatcher) Push(w worker.SyncWorker) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -127,7 +137,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
+func (ww *workerWatcher) List() []worker.SyncWorker {
return ww.stack.Workers()
}
@@ -142,14 +152,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
_ = ww.stack.FindAndRemoveByPid(w.Pid())
- err = ww.AllocateNew()
+ err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
Event: events.EventPoolError,