diff options
author | Valery Piashchynski <[email protected]> | 2021-06-26 09:37:27 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-26 09:37:27 +0300 |
commit | fc540f6029772ff51913b8ee3c082f8197010c52 (patch) | |
tree | 6f92954e4410ff6351e189338daf9fc4dc2feaea | |
parent | e9249c7896331bab97a18a7ee0db17803fdd31fb (diff) | |
parent | 744bf2e237e92e6ecddd3846dcb9a6b66967f67a (diff) |
#738 fix(allocator): workers were not reallocating when `exec_ttl` was hitv2.3.1-rc.1
#738 fix(allocator): workers were not reallocating when `exec_ttl` was hit
-rw-r--r-- | CHANGELOG.md | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 105 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 48 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 9 | ||||
-rw-r--r-- | pkg/worker_watcher/container/interface.go | 8 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 14 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 122 | ||||
-rw-r--r-- | tests/plugins/http/configs/.rr-http-supervised-pool.yaml | 8 | ||||
-rw-r--r-- | tests/plugins/http/http_plugin_test.go | 2 | ||||
-rw-r--r-- | tests/psr-worker-slow.php | 29 |
11 files changed, 191 insertions, 158 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index ea55a10d..1e68b285 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ v2.3.1 (_.06.2021) ## 🩹 Fixes: +- 🐛 Fix: Bug with channel deadlock when `exec_ttl` was used and TTL limit reached [PR](https://github.com/spiral/roadrunner/pull/738) +- 🐛 Fix: Bug with healthcheck endpoint when workers were mark as invalid and stays is that state until next request [PR](https://github.com/spiral/roadrunner/pull/738) - 🐛 Fix: Bugs with `boltdb` storage: [Boom](https://github.com/spiral/roadrunner/issues/717), [Boom](https://github.com/spiral/roadrunner/issues/718), [Boom](https://github.com/spiral/roadrunner/issues/719) - 🐛 Fix: Bug with incorrect redis initialization and usage [Bug](https://github.com/spiral/roadrunner/issues/720) - 🐛 Fix: Bug, Goridge duplicate error messages [Bug](https://github.com/spiral/goridge/issues/128) diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index ab025fa1..e568661f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo return sp.execDebugWithTTL(ctx, p) } - ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxAlloc, op) if err != nil { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index ca61dbc4..b09b6f6c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -type ttlExec struct { - err error - p payload.Payload -} - -func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - return sp.pool.Exec(rqs) - } - - c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) - defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } - } +func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { + panic("used to satisfy pool interface") } func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { @@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { return sp.pool.Exec(rqs) } - c := make(chan ttlExec, 1) ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - return res.p, nil - } + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return payload.Payload{}, errors.E(op, err) } + + return res, nil } func (sp *supervised) GetConfig() interface{} { @@ -164,7 +104,7 @@ func (sp *supervised) Stop() { sp.stopCh <- struct{}{} } -func (sp *supervised) control() { +func (sp *supervised) control() { //nolint:gocognit now := time.Now() // MIGHT BE OUTDATED @@ -172,7 +112,16 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == worker.StateInvalid { + // if worker not in the Ready OR working state + // skip such worker + switch workers[i].State().Value() { + case + worker.StateInvalid, + worker.StateErrored, + worker.StateDestroyed, + worker.StateInactive, + worker.StateStopped, + worker.StateStopping: continue } @@ -183,12 +132,23 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - workers[i].State().Set(worker.StateInvalid) + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + + // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue @@ -230,6 +190,11 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index dc307c33..513d369f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.execWithTTL(context.Background(), payload.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -170,6 +170,48 @@ func TestSupervisedPool_Idle(t *testing.T) { p.Destroy(context.Background()) } +func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 1 * time.Second, + IdleTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 2) + // should be destroyed, state should be Ready, not Invalid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + assert.Equal(t, int64(1), p.Workers()[0].State().Value()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: uint64(1), diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 13e70f49..84ff5977 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -111,6 +111,15 @@ func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (p return } + if tw.process.State().Value() != StateWorking { + tw.process.State().RegisterExec() + c <- wexec{ + payload: rsp, + err: nil, + } + return + } + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() diff --git a/pkg/worker_watcher/container/interface.go b/pkg/worker_watcher/container/interface.go index 532bace9..e10ecdae 100644 --- a/pkg/worker_watcher/container/interface.go +++ b/pkg/worker_watcher/container/interface.go @@ -1,13 +1,17 @@ package container -import "github.com/spiral/roadrunner/v2/pkg/worker" +import ( + "context" + + "github.com/spiral/roadrunner/v2/pkg/worker" +) // Vector interface represents vector container type Vector interface { // Enqueue used to put worker to the vector Enqueue(worker.BaseProcess) // Dequeue used to get worker from the vector - Dequeue() (worker.BaseProcess, bool) + Dequeue(ctx context.Context) (worker.BaseProcess, error) // Destroy used to stop releasing the workers Destroy() } diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 565b1b69..b9150c43 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -1,8 +1,10 @@ package container import ( + "context" "sync/atomic" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -24,18 +26,24 @@ func (v *Vec) Enqueue(w worker.BaseProcess) { v.workers <- w } -func (v *Vec) Dequeue() (worker.BaseProcess, bool) { +func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { /* if *addr == old { *addr = new return true } */ + if atomic.CompareAndSwapUint64(&v.destroy, 1, 1) { - return nil, true + return nil, errors.E(errors.WatcherStopped) } - return <-v.workers, false + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) + } } func (v *Vec) Destroy() { diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 108756fc..f82de958 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -47,88 +47,64 @@ func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error { return nil } -// return value from Get -type get struct { - w worker.BaseProcess - err error -} - // Get is not a thread safe operation func (ww *workerWatcher) Get(ctx context.Context) (worker.BaseProcess, error) { - c := make(chan get, 1) const op = errors.Op("worker_watcher_get_free_worker") - go func() { - // FAST PATH - // thread safe operation - w, stop := ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } - return - } - // fast path, worker not nil and in the ReadyState - if w.State().Value() == worker.StateReady { - c <- get{ - w, - nil, - } - return + // thread safe operation + w, err := ww.container.Dequeue(ctx) + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) + } + + if err != nil { + return nil, errors.E(op, err) + } + + // fast path, worker not nil and in the ReadyState + if w.State().Value() == worker.StateReady { + return w, nil + } + + // ========================================================= + // SLOW PATH + _ = w.Kill() // how the worker get here??????? + // no free workers in the container + // try to continuously get free one + for { + w, err = ww.container.Dequeue(ctx) + + if errors.Is(errors.WatcherStopped, err) { + return nil, errors.E(op, errors.WatcherStopped) } - // ========================================================= - // SLOW PATH - _ = w.Kill() // how the worker get here??????? - // no free workers in the container - // try to continuously get free one - for { - w, stop = ww.container.Dequeue() - if stop { - c <- get{ - nil, - errors.E(op, errors.WatcherStopped), - } - } - switch w.State().Value() { - // return only workers in the Ready state - // check first - case worker.StateReady: - c <- get{ - w, - nil, - } - return - case worker.StateWorking: // how?? - ww.container.Enqueue(w) // put it back, let worker finish the work - continue - case - // all the possible wrong states - worker.StateInactive, - worker.StateDestroyed, - worker.StateErrored, - worker.StateStopped, - worker.StateInvalid, - worker.StateKilling, - worker.StateStopping: - // worker doing no work because it in the container - // so we can safely kill it (inconsistent state) - _ = w.Kill() - // try to get new worker - continue - } + if err != nil { + return nil, errors.E(op, err) } - }() - select { - case r := <-c: - if r.err != nil { - return nil, r.err + switch w.State().Value() { + // return only workers in the Ready state + // check first + case worker.StateReady: + return w, nil + case worker.StateWorking: // how?? + ww.container.Enqueue(w) // put it back, let worker finish the work + continue + case + // all the possible wrong states + worker.StateInactive, + worker.StateDestroyed, + worker.StateErrored, + worker.StateStopped, + worker.StateInvalid, + worker.StateKilling, + worker.StateStopping: + // worker doing no work because it in the container + // so we can safely kill it (inconsistent state) + _ = w.Kill() + // try to get new worker + continue } - return r.w, nil - case <-ctx.Done(): - return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the container, timeout exceed")) } } diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml index e0c38c12..8d4d81d9 100644 --- a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml +++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml @@ -2,17 +2,13 @@ rpc: listen: tcp://127.0.0.1:15432 server: command: "php ../../http/client.php echo pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:18888 max_request_size: 1024 - middleware: [ "" ] + middleware: [] uploads: forbid: [ ".php", ".exe", ".bat" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -29,4 +25,4 @@ http: max_worker_memory: 100 logs: mode: development - level: error
\ No newline at end of file + level: error diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index fcedb943..c3949911 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -1481,6 +1481,8 @@ func informerTestAfter(t *testing.T) { assert.NotZero(t, workerPid) + time.Sleep(time.Second * 5) + err = client.Call("informer.Workers", "http", &list) assert.NoError(t, err) assert.Len(t, list.Workers, 1) diff --git a/tests/psr-worker-slow.php b/tests/psr-worker-slow.php new file mode 100644 index 00000000..153dff68 --- /dev/null +++ b/tests/psr-worker-slow.php @@ -0,0 +1,29 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require __DIR__ . "/vendor/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + sleep(mt_rand(1,20)); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} |