summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher/worker_watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker_watcher/worker_watcher.go')
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
1 files changed, 64 insertions, 14 deletions
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348be199..bdd91423 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Vector interface represents vector container
@@ -30,21 +32,24 @@ type workerWatcher struct {
sync.RWMutex
container Vector
// used to control Destroy stage (that all workers are in the container)
- numWorkers uint64
+ numWorkers *uint64
workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
+ allocator worker.Allocator
+ allocateTimeout time.Duration
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
- numWorkers: numWorkers,
+ container: channel.NewVector(numWorkers),
- workers: make([]worker.BaseProcess, 0, numWorkers),
+ // pass a ptr to the number of workers to avoid blocking in the TTL loop
+ numWorkers: utils.Uint64(numWorkers),
+ allocateTimeout: allocateTimeout,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
events: events,
@@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
+
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, errors.WorkerAllocate, err)
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
+ })
+
+ // if no timeout, return error immediately
+ if ww.allocateTimeout == 0 {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ tt := time.After(ww.allocateTimeout)
+ for {
+ select {
+ case <-tt:
+ // reduce number of workers
+ atomic.AddUint64(ww.numWorkers, ^uint64(0))
+ // timeout exceed, worker can't be allocated
+ return errors.E(op, errors.WorkerAllocate, err)
+ default:
+ sw, err = ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
+ })
+ continue
+ }
+
+ // reallocated
+ goto done
+ }
+ }
}
+done:
// add worker to Wait
ww.addToWatch(sw)
+ ww.Lock()
// add new worker to the workers slice (to get information about workers in parallel)
ww.workers = append(ww.workers, sw)
-
- // unlock Allocate mutex
ww.Unlock()
+
// push the worker to the container
ww.Release(sw)
return nil
@@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
for i := 0; i < len(ww.workers); i++ {
if ww.workers[i].Pid() == pid {
ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker
+ // kill worker, just to be sure it's dead
_ = wb.Kill()
return
}
@@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
-// Destroy all underlying container (but let them to complete the task)
+// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(_ context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
ww.Lock()
@@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
case <-tt.C:
ww.Lock()
// that might be one of the workers is working
- if ww.numWorkers != uint64(len(ww.workers)) {
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
ww.Unlock()
continue
}
@@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
+ if len(ww.workers) == 0 {
+ return nil
+ }
+
base := make([]worker.BaseProcess, 0, len(ww.workers))
for i := 0; i < len(ww.workers); i++ {
base = append(base, ww.workers[i])
@@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
Event: events.EventPoolError,
Payload: errors.E(op, err),
})
+
+ // no workers at all, panic
+ if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ }
}
}