summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pool/static_pool.go2
-rw-r--r--pkg/pool/supervisor_test.go50
-rw-r--r--pkg/worker_watcher/container/channel/vec.go5
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go78
4 files changed, 116 insertions, 19 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 3eb0714f..720ca9da 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -78,7 +78,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index d1b24574..14df513e 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -2,6 +2,7 @@ package pool
import (
"context"
+ "os"
"os/exec"
"testing"
"time"
@@ -361,3 +362,52 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
<-block
p.Destroy(context.Background())
}
+
+func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
+ var cfgExecTTL = &Config{
+ NumWorkers: uint64(2),
+ AllocateTimeout: time.Second * 15,
+ DestroyTimeout: time.Second * 5,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/allocate-failed.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ require.NotNil(t, p)
+
+ time.Sleep(time.Second)
+
+ // should be ok
+ _, err = p.Exec(&payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ require.NoError(t, err)
+
+ // after creating this file, PHP will fail
+ file, err := os.Create("break")
+ require.NoError(t, err)
+
+ time.Sleep(time.Second * 5)
+ assert.NoError(t, file.Close())
+ assert.NoError(t, os.Remove("break"))
+
+ defer func() {
+ if r := recover(); r != nil {
+ assert.Fail(t, "panic should not be fired!")
+ } else {
+ p.Destroy(context.Background())
+ }
+ }()
+}
diff --git a/pkg/worker_watcher/container/channel/vec.go b/pkg/worker_watcher/container/channel/vec.go
index 7fb65a92..5605f1e0 100644
--- a/pkg/worker_watcher/container/channel/vec.go
+++ b/pkg/worker_watcher/container/channel/vec.go
@@ -15,14 +15,11 @@ type Vec struct {
destroy uint64
// channel with the workers
workers chan worker.BaseProcess
-
- len uint64
}
func NewVector(len uint64) *Vec {
vec := &Vec{
destroy: 0,
- len: len,
workers: make(chan worker.BaseProcess, len),
}
@@ -48,7 +45,7 @@ func (v *Vec) Push(w worker.BaseProcess) {
1. TTL is set with no requests during the TTL
2. Violated Get <-> Release operation (how ??)
*/
- for i := uint64(0); i < v.len; i++ {
+ for i := 0; i < len(v.workers); i++ {
/*
We need to drain vector until we found a worker in the Invalid/Killing/Killed/etc states.
*/
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348be199..bdd91423 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -3,12 +3,14 @@ package worker_watcher //nolint:stylecheck
import (
"context"
"sync"
+ "sync/atomic"
"time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container/channel"
+ "github.com/spiral/roadrunner/v2/utils"
)
// Vector interface represents vector container
@@ -30,21 +32,24 @@ type workerWatcher struct {
sync.RWMutex
container Vector
// used to control Destroy stage (that all workers are in the container)
- numWorkers uint64
+ numWorkers *uint64
workers []worker.BaseProcess
- allocator worker.Allocator
- events events.Handler
+ allocator worker.Allocator
+ allocateTimeout time.Duration
+ events events.Handler
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler, allocateTimeout time.Duration) *workerWatcher {
ww := &workerWatcher{
- container: channel.NewVector(numWorkers),
- numWorkers: numWorkers,
+ container: channel.NewVector(numWorkers),
- workers: make([]worker.BaseProcess, 0, numWorkers),
+ // pass a ptr to the number of workers to avoid blocking in the TTL loop
+ numWorkers: utils.Uint64(numWorkers),
+ allocateTimeout: allocateTimeout,
+ workers: make([]worker.BaseProcess, 0, numWorkers),
allocator: allocator,
events: events,
@@ -128,21 +133,57 @@ func (ww *workerWatcher) Take(ctx context.Context) (worker.BaseProcess, error) {
}
func (ww *workerWatcher) Allocate() error {
- ww.Lock()
const op = errors.Op("worker_watcher_allocate_new")
+
sw, err := ww.allocator()
if err != nil {
- return errors.E(op, errors.WorkerAllocate, err)
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker: %v", err)),
+ })
+
+ // if no timeout, return error immediately
+ if ww.allocateTimeout == 0 {
+ return errors.E(op, errors.WorkerAllocate, err)
+ }
+
+ tt := time.After(ww.allocateTimeout)
+ for {
+ select {
+ case <-tt:
+ // reduce number of workers
+ atomic.AddUint64(ww.numWorkers, ^uint64(0))
+ // timeout exceed, worker can't be allocated
+ return errors.E(op, errors.WorkerAllocate, err)
+ default:
+ sw, err = ww.allocator()
+ if err != nil {
+ // log incident
+ ww.events.Push(
+ events.WorkerEvent{
+ Event: events.EventWorkerError,
+ Payload: errors.E(op, errors.Errorf("can't allocate worker, retry attempt failed: %v", err)),
+ })
+ continue
+ }
+
+ // reallocated
+ goto done
+ }
+ }
}
+done:
// add worker to Wait
ww.addToWatch(sw)
+ ww.Lock()
// add new worker to the workers slice (to get information about workers in parallel)
ww.workers = append(ww.workers, sw)
-
- // unlock Allocate mutex
ww.Unlock()
+
// push the worker to the container
ww.Release(sw)
return nil
@@ -160,7 +201,7 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
for i := 0; i < len(ww.workers); i++ {
if ww.workers[i].Pid() == pid {
ww.workers = append(ww.workers[:i], ww.workers[i+1:]...)
- // kill worker
+ // kill worker, just to be sure it's dead
_ = wb.Kill()
return
}
@@ -177,7 +218,7 @@ func (ww *workerWatcher) Release(w worker.BaseProcess) {
}
}
-// Destroy all underlying container (but let them to complete the task)
+// Destroy all underlying container (but let them complete the task)
func (ww *workerWatcher) Destroy(_ context.Context) {
// destroy container, we don't use ww mutex here, since we should be able to push worker
ww.Lock()
@@ -192,7 +233,7 @@ func (ww *workerWatcher) Destroy(_ context.Context) {
case <-tt.C:
ww.Lock()
// that might be one of the workers is working
- if ww.numWorkers != uint64(len(ww.workers)) {
+ if atomic.LoadUint64(ww.numWorkers) != uint64(len(ww.workers)) {
ww.Unlock()
continue
}
@@ -216,6 +257,10 @@ func (ww *workerWatcher) List() []worker.BaseProcess {
ww.RLock()
defer ww.RUnlock()
+ if len(ww.workers) == 0 {
+ return nil
+ }
+
base := make([]worker.BaseProcess, 0, len(ww.workers))
for i := 0; i < len(ww.workers); i++ {
base = append(base, ww.workers[i])
@@ -253,6 +298,11 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
Event: events.EventPoolError,
Payload: errors.E(op, err),
})
+
+ // no workers at all, panic
+ if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {
+ panic(errors.E(op, errors.WorkerAllocate, errors.Errorf("can't allocate workers: %v", err)))
+ }
}
}