summaryrefslogtreecommitdiff
path: root/pkg/worker_watcher
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
committerValery Piashchynski <[email protected]>2021-08-26 18:32:51 +0300
commitefb3efa98c8555815330274f0618bfc080f4c65c (patch)
treeb3bcabdb22fade6ef06d865d60995bc15f84cf1c /pkg/worker_watcher
parent3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (diff)
Move drivers to the plugin's root.
Fix #771, add tests. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/worker_watcher')
-rw-r--r--pkg/worker_watcher/container/channel/vec.go5
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
2 files changed, 65 insertions, 18 deletions
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 7fb65a92..5605f1e0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -15,14 +15,11 @@ type Vec struct {
destroy uint64
// channel with the workers
workers chan worker.BaseProcess
-
- len uint64
}
func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- len: len,
workers: make(chan worker.BaseProcess, len),
}
@@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
1. TTL is set with no requests during the TTL
2. Violated Get <-> Release operation (how ??)
*/
- for i := uint64(0); i < v.len; i++ {
+ for i := 0; i < len(v.workers); i++ {
/*
We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
*/
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348be199..bdd91423 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Vector interface represents vector container
@@ -30,21 +32,24 @@ type workerWatcher struct {
sync.RWMutex
container Vector
// used to control Destroy stage (that all workers are in the container)
- numWorkers uint64
+ numWorkers *uint64
workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
+ allocator worker.Allocator
+ allocateTimeout time.Duration
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
- numWorkers: numWorkers,
+ container: channel.NewVector(numWorkers),
- workers: make([]worker.BaseProcess, 0, numWorkers),
+ // pass a ptr to the number of workers to avoid blocking in the TTL loop
+ numWorkers: utils.Uint64(numWorkers),
+ allocateTimeout: allocateTimeout,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
events: events,
@@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
+
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, errors.WorkerAllocate, err)
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
+ })
+
+ // if no timeout, return error immediately
+ if ww.allocateTimeout == 0 {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ tt := time.After(ww.allocateTimeout)
+ for {
+ select {
+ case <-tt:
+ // reduce number of workers
+ atomic.AddUint64(ww.numWorkers, ^uint64(0))
+ // timeout exceed, worker can't be allocated
+ return errors.E(op, errors.WorkerAllocate, err)
+ default:
+ sw, err = ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
+ })
+ continue
+ }
+
+ // reallocated
+ goto done
+ }
+ }
}
+done:
// add worker to Wait
ww.addToWatch(sw)
+ ww.Lock()
// add new worker to the workers slice (to get information about workers in parallel)
ww.workers = append(ww.workers, sw)
-
- // unlock Allocate mutex
ww.Unlock()
+
// push the worker to the container
ww.Release(sw)
return nil
@@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
for i := 0; i < len(ww.workers); i++ {
if ww.workers[i].Pid() == pid {
ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker
+ // kill worker, just to be sure it's dead
_ = wb.Kill()
return
}
@@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
-// Destroy all underlying container (but let them to complete the task)
+// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(_ context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
ww.Lock()
@@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
case <-tt.C:
ww.Lock()
// that might be one of the workers is working
- if ww.numWorkers != uint64(len(ww.workers)) {
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
ww.Unlock()
continue
}
@@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
+ if len(ww.workers) == 0 {
+ return nil
+ }
+
base := make([]worker.BaseProcess, 0, len(ww.workers))
for i := 0; i < len(ww.workers); i++ {
base = append(base, ww.workers[i])
@@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
Event: events.EventPoolError,
Payload: errors.E(op, err),
})
+
+ // no workers at all, panic
+ if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ }
}
}