diff options
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 31 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 42 | ||||
-rw-r--r-- | pkg/worker_watcher/container/vec.go | 12 | ||||
-rw-r--r-- | tests/plugins/http/configs/.rr-http-supervised-pool.yaml | 8 | ||||
-rw-r--r-- | tests/plugins/http/http_plugin_test.go | 2 | ||||
-rw-r--r-- | tests/psr-worker-slow.php | 29 |
6 files changed, 108 insertions, 16 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index a1dd21ac..b09b6f6c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -104,7 +104,7 @@ func (sp *supervised) Stop() { sp.stopCh <- struct{}{} } -func (sp *supervised) control() { +func (sp *supervised) control() { //nolint:gocognit now := time.Now() // MIGHT BE OUTDATED @@ -112,7 +112,16 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == worker.StateInvalid { + // if worker not in the Ready OR working state + // skip such worker + switch workers[i].State().Value() { + case + worker.StateInvalid, + worker.StateErrored, + worker.StateDestroyed, + worker.StateInactive, + worker.StateStopped, + worker.StateStopping: continue } @@ -123,12 +132,23 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - workers[i].State().Set(worker.StateInvalid) + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + + // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue @@ -170,6 +190,11 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index f371b925..513d369f 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -170,6 +170,48 @@ func TestSupervisedPool_Idle(t *testing.T) { p.Destroy(context.Background()) } +func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 1 * time.Second, + IdleTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 2) + // should be destroyed, state should be Ready, not Invalid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + assert.Equal(t, int64(1), p.Workers()[0].State().Value()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: uint64(1), diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go index 8072af10..b9150c43 100644 --- a/pkg/worker_watcher/container/vec.go +++ b/pkg/worker_watcher/container/vec.go @@ -38,13 +38,11 @@ func (v *Vec) Dequeue(ctx context.Context) (worker.BaseProcess, error) { return nil, errors.E(errors.WatcherStopped) } - for { - select { - case w := <-v.workers: - return w, nil - case <-ctx.Done(): - return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) - } + select { + case w := <-v.workers: + return w, nil + case <-ctx.Done(): + return nil, errors.E(ctx.Err(), errors.NoFreeWorkers) } } diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml index e0c38c12..8d4d81d9 100644 --- a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml +++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml @@ -2,17 +2,13 @@ rpc: listen: tcp://127.0.0.1:15432 server: command: "php ../../http/client.php echo pipes" - user: "" - group: "" - env: - "RR_HTTP": "true" relay: "pipes" relay_timeout: "20s" http: address: 127.0.0.1:18888 max_request_size: 1024 - middleware: [ "" ] + middleware: [] uploads: forbid: [ ".php", ".exe", ".bat" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] @@ -29,4 +25,4 @@ http: max_worker_memory: 100 logs: mode: development - level: error
\ No newline at end of file + level: error diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index fcedb943..c3949911 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -1481,6 +1481,8 @@ func informerTestAfter(t *testing.T) { assert.NotZero(t, workerPid) + time.Sleep(time.Second * 5) + err = client.Call("informer.Workers", "http", &list) assert.NoError(t, err) assert.Len(t, list.Workers, 1) diff --git a/tests/psr-worker-slow.php b/tests/psr-worker-slow.php new file mode 100644 index 00000000..153dff68 --- /dev/null +++ b/tests/psr-worker-slow.php @@ -0,0 +1,29 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ +use Spiral\Goridge; +use Spiral\RoadRunner; + +ini_set('display_errors', 'stderr'); +require __DIR__ . "/vendor/autoload.php"; + +$worker = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$psr7 = new RoadRunner\Http\PSR7Worker( + $worker, + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory(), + new \Nyholm\Psr7\Factory\Psr17Factory() +); + +while ($req = $psr7->waitRequest()) { + try { + $resp = new \Nyholm\Psr7\Response(); + sleep(mt_rand(1,20)); + $resp->getBody()->write("hello world"); + + $psr7->respond($resp); + } catch (\Throwable $e) { + $psr7->getWorker()->error((string)$e); + } +} |