summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-26 09:37:27 +0300
committerGitHub <[email protected]>2021-06-26 09:37:27 +0300
commitfc540f6029772ff51913b8ee3c082f8197010c52 (patch)
tree6f92954e4410ff6351e189338daf9fc4dc2feaea
parente9249c7896331bab97a18a7ee0db17803fdd31fb (diff)
parent744bf2e237e92e6ecddd3846dcb9a6b66967f67a (diff)
#738 fix(allocator): workers were not reallocating when `exec_ttl` was hitv2.3.1-rc.1
#738 fix(allocator): workers were not reallocating when `exec_ttl` was hit
-rw-r--r--CHANGELOG.md2
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go105
-rw-r--r--pkg/pool/supervisor_test.go48
-rwxr-xr-xpkg/worker/sync_worker.go9
-rw-r--r--pkg/worker_watcher/container/interface.go8
-rw-r--r--pkg/worker_watcher/container/vec.go14
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go122
-rw-r--r--tests/plugins/http/configs/.rr-http-supervised-pool.yaml8
-rw-r--r--tests/plugins/http/http_plugin_test.go2
-rw-r--r--tests/psr-worker-slow.php29
11 files changed, 191 insertions, 158 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ea55a10d..1e68b285 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,8 @@ v2.3.1 (_.06.2021)
## 🩹 Fixes:
+- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738)
+- 🐛 Fix: Bug with healthcheck endpoint when workers were mark as invalid and stays is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738)
- 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719)
- 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720)
- 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128)
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index ab025fa1..e568661f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execDebugWithTTL(ctx, p)
}
- ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxAlloc, op)
if err != nil {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index ca61dbc4..b09b6f6c 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-type ttlExec struct {
- err error
- p payload.Payload
-}
-
-func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised_exec_with_context")
- if sp.cfg.ExecTTL == 0 {
- return sp.pool.Exec(rqs)
- }
-
- c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
- defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
-
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
-
- return res.p, nil
- }
- }
+func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+ panic("used to satisfy pool interface")
}
func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
@@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
return sp.pool.Exec(rqs)
}
- c := make(chan ttlExec, 1)
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
-
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
- return res.p, nil
- }
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
+
+ return res, nil
}
func (sp *supervised) GetConfig() interface{} {
@@ -164,7 +104,7 @@ func (sp *supervised) Stop() {
sp.stopCh <- struct{}{}
}
-func (sp *supervised) control() {
+func (sp *supervised) control() { //nolint:gocognit
now := time.Now()
// MIGHT BE OUTDATED
@@ -172,7 +112,16 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == worker.StateInvalid {
+ // if worker not in the Ready OR working state
+ // skip such worker
+ switch workers[i].State().Value() {
+ case
+ worker.StateInvalid,
+ worker.StateErrored,
+ worker.StateDestroyed,
+ worker.StateInactive,
+ worker.StateStopped,
+ worker.StateStopping:
continue
}
@@ -183,12 +132,23 @@ func (sp *supervised) control() {
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- workers[i].State().Set(worker.StateInvalid)
+ // SOFT termination. DO NOT STOP active workers
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ // SOFT termination. DO NOT STOP active workers
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+
+ // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -230,6 +190,11 @@ func (sp *supervised) control() {
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ if workers[i].State().Value() != worker.StateWorking {
+ workers[i].State().Set(worker.StateInvalid)
+ _ = workers[i].Stop()
+ }
+
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index dc307c33..513d369f 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.execWithTTL(context.Background(), payload.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -170,6 +170,48 @@ func TestSupervisedPool_Idle(t *testing.T) {
p.Destroy(context.Background())
}
+func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 1 * time.Second,
+ IdleTTL: 1 * time.Second,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ time.Sleep(time.Millisecond * 100)
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 2)
+ // should be destroyed, state should be Ready, not Invalid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ assert.Equal(t, int64(1), p.Workers()[0].State().Value())
+}
+
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 13e70f49..84ff5977 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -111,6 +111,15 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p
return
}
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ c <- wexec{
+ payload: rsp,
+ err: nil,
+ }
+ return
+ }
+
tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go
index 532bace9..e10ecdae 100644
--- a/pkg/worker_watcher/container/interface.go
+++ b/pkg/worker_watcher/container/interface.go
@@ -1,13 +1,17 @@
package container
-import "github.com/spiral/roadrunner/v2/pkg/worker"
+import (
+ "context"
+
+ "github.com/spiral/roadrunner/v2/pkg/worker"
+)
// Vector interface represents vector container
type Vector interface {
// Enqueue used to put worker to the vector
Enqueue(worker.BaseProcess)
// Dequeue used to get worker from the vector
- Dequeue() (worker.BaseProcess, bool)
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
// Destroy used to stop releasing the workers
Destroy()
}
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index 565b1b69..b9150c43 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -1,8 +1,10 @@
package container
import (
+ "context"
"sync/atomic"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -24,18 +26,24 @@ func (v *Vec) Enqueue(w worker.BaseProcess) {
v.workers <- w
}
-func (v *Vec) Dequeue() (worker.BaseProcess, bool) {
+func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) {
/*
if *addr == old {
*addr = new
return true
}
*/
+
if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) {
- return nil, true
+ return nil, errors.E(errors.WatcherStopped)
}
- return <-v.workers, false
+ select {
+ case w := <-v.workers:
+ return w, nil
+ case <-ctx.Done():
+ return nil, errors.E(ctx.Err(), errors.NoFreeWorkers)
+ }
}
func (v *Vec) Destroy() {
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 108756fc..f82de958 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -47,88 +47,64 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
return nil
}
-// return value from Get
-type get struct {
- w worker.BaseProcess
- err error
-}
-
// Get is not a thread safe operation
func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) {
- c := make(chan get, 1)
const op = errors.Op("worker_watcher_get_free_worker")
- go func() {
- // FAST PATH
- // thread safe operation
- w, stop := ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
- return
- }
- // fast path, worker not nil and in the ReadyState
- if w.State().Value() == worker.StateReady {
- c <- get{
- w,
- nil,
- }
- return
+ // thread safe operation
+ w, err := ww.container.Dequeue(ctx)
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
+ }
+
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // fast path, worker not nil and in the ReadyState
+ if w.State().Value() == worker.StateReady {
+ return w, nil
+ }
+
+ // =========================================================
+ // SLOW PATH
+ _ = w.Kill() // how the worker get here???????
+ // no free workers in the container
+ // try to continuously get free one
+ for {
+ w, err = ww.container.Dequeue(ctx)
+
+ if errors.Is(errors.WatcherStopped, err) {
+ return nil, errors.E(op, errors.WatcherStopped)
}
- // =========================================================
- // SLOW PATH
- _ = w.Kill() // how the worker get here???????
- // no free workers in the container
- // try to continuously get free one
- for {
- w, stop = ww.container.Dequeue()
- if stop {
- c <- get{
- nil,
- errors.E(op, errors.WatcherStopped),
- }
- }
- switch w.State().Value() {
- // return only workers in the Ready state
- // check first
- case worker.StateReady:
- c <- get{
- w,
- nil,
- }
- return
- case worker.StateWorking: // how??
- ww.container.Enqueue(w) // put it back, let worker finish the work
- continue
- case
- // all the possible wrong states
- worker.StateInactive,
- worker.StateDestroyed,
- worker.StateErrored,
- worker.StateStopped,
- worker.StateInvalid,
- worker.StateKilling,
- worker.StateStopping:
- // worker doing no work because it in the container
- // so we can safely kill it (inconsistent state)
- _ = w.Kill()
- // try to get new worker
- continue
- }
+ if err != nil {
+ return nil, errors.E(op, err)
}
- }()
- select {
- case r := <-c:
- if r.err != nil {
- return nil, r.err
+ switch w.State().Value() {
+ // return only workers in the Ready state
+ // check first
+ case worker.StateReady:
+ return w, nil
+ case worker.StateWorking: // how??
+ ww.container.Enqueue(w) // put it back, let worker finish the work
+ continue
+ case
+ // all the possible wrong states
+ worker.StateInactive,
+ worker.StateDestroyed,
+ worker.StateErrored,
+ worker.StateStopped,
+ worker.StateInvalid,
+ worker.StateKilling,
+ worker.StateStopping:
+ // worker doing no work because it in the container
+ // so we can safely kill it (inconsistent state)
+ _ = w.Kill()
+ // try to get new worker
+ continue
}
- return r.w, nil
- case <-ctx.Done():
- return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed"))
}
}
diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
index e0c38c12..8d4d81d9 100644
--- a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
+++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
@@ -2,17 +2,13 @@ rpc:
listen: tcp://127.0.0.1:15432
server:
command: "php ../../http/client.php echo pipes"
- user: ""
- group: ""
- env:
- "RR_HTTP": "true"
relay: "pipes"
relay_timeout: "20s"
http:
address: 127.0.0.1:18888
max_request_size: 1024
- middleware: [ "" ]
+ middleware: []
uploads:
forbid: [ ".php", ".exe", ".bat" ]
trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
@@ -29,4 +25,4 @@ http:
max_worker_memory: 100
logs:
mode: development
- level: error \ No newline at end of file
+ level: error
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index fcedb943..c3949911 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -1481,6 +1481,8 @@ func informerTestAfter(t *testing.T) {
assert.NotZero(t, workerPid)
+ time.Sleep(time.Second * 5)
+
err = client.Call("informer.Workers", "http", &list)
assert.NoError(t, err)
assert.Len(t, list.Workers, 1)
diff --git a/tests/psr-worker-slow.php b/tests/psr-worker-slow.php
new file mode 100644
index 00000000..153dff68
--- /dev/null
+++ b/tests/psr-worker-slow.php
@@ -0,0 +1,29 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+ini_set('display_errors', 'stderr');
+require __DIR__ . "/vendor/autoload.php";
+
+$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$psr7 = new RoadRunner\Http\PSR7Worker(
+ $worker,
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory(),
+ new \Nyholm\Psr7\Factory\Psr17Factory()
+);
+
+while ($req = $psr7->waitRequest()) {
+ try {
+ $resp = new \Nyholm\Psr7\Response();
+ sleep(mt_rand(1,20));
+ $resp->getBody()->write("hello world");
+
+ $psr7->respond($resp);
+ } catch (\Throwable $e) {
+ $psr7->getWorker()->error((string)$e);
+ }
+}