From e684ac16035bed9a4c09677b0db3b33477955dc9 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 24 Feb 2021 00:12:05 +0300 Subject: Fix pool behavior, update tests --- CHANGELOG.md | 7 +-- go.sum | 6 --- pkg/pool/interface.go | 6 +-- pkg/pool/static_pool.go | 10 ++-- pkg/pool/static_pool_test.go | 6 +-- pkg/pool/supervisor_pool.go | 63 ++++++++++++++-------- pkg/pool/supervisor_test.go | 42 ++++++--------- pkg/worker/interface.go | 2 +- pkg/worker/sync_worker.go | 2 +- tests/exec_ttl.php | 15 ++++++ tests/idle.php | 15 ++++++ tests/memleak.php | 2 +- .../http/configs/.rr-http-supervised-pool.yaml | 2 +- tests/plugins/http/http_plugin_test.go | 28 +++++++--- tests/sleep.php | 2 +- 15 files changed, 125 insertions(+), 83 deletions(-) create mode 100644 tests/exec_ttl.php create mode 100644 tests/idle.php diff --git a/CHANGELOG.md b/CHANGELOG.md index b11ffdad..3f31aca3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,12 @@ CHANGELOG v2.0.0-RC.4 (20.02.2021) ------------------- -- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550) -- Endure container update to v1.0.0-RC.2 version +- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550). +- Endure container update to v1.0.0-RC.2 version. - Remove unneeded mutex from the `http.Workers` method. -- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557) +- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557). - Add static, headers, status, gzip plugins to the `main.go`. +- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error. v2.0.0-RC.3 (17.02.2021) ------------------- diff --git a/go.sum b/go.sum index 4bac6827..113aa09b 100644 --- a/go.sum +++ b/go.sum @@ -114,11 +114,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-redis/redis/v8 v8.5.0 h1:L3r1Q3I5WOUdXZGCP6g44EruKh0u3n6co5Hl5xWkdGA= -github.com/go-redis/redis/v8 v8.5.0/go.mod h1:YmEcgBDttjnkbMzDAhDtQxY9yVA7jMN6PCR5HeMvqFE= github.com/go-redis/redis/v8 v8.6.0 h1:swqbqOrxaPztsj2Hf1p94M3YAgl7hYEpcw21z299hh8= github.com/go-redis/redis/v8 v8.6.0/go.mod h1:DQ9q4Rk2HtwkrwVrdgmphoOQDMfpvcd/nHEwRsicg8s= -github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68 h1:QR2R74UbwMtnEVGVvNfcx6mQmWGgN8abQeXOy92pQIo= github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -416,11 +413,8 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -github.com/yookoala/gofast v0.5.0 h1:QFGxVAf5ksx0eqFapzvc1ZDvk/LoYhbCyiw7WKoXtSA= -github.com/yookoala/gofast v0.5.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ= github.com/yookoala/gofast v0.6.0 h1:E5x2acfUD7GkzCf8bmIMwnV10VxDy5tUCHc5LGhluwc= github.com/yookoala/gofast v0.6.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ= -github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index bfc56c3f..4ef2f2e7 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -15,9 +15,6 @@ type Pool interface { // Exec executes task with payload Exec(rqs payload.Payload) (payload.Payload, error) - // ExecWithContext executes task with context which is used with timeout - ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) - // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -26,4 +23,7 @@ type Pool interface { // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) + + // ExecWithContext executes task with context which is used with timeout + execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index f1b20bb9..0617cbc0 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -168,16 +168,16 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } // Be careful, sync with pool.Exec method -func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") - ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.getWorker(ctxAlloc, op) if err != nil { return payload.Payload{}, errors.E(op, err) } - rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p) + rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) if err != nil { return sp.err_encoder(err, w) } @@ -185,7 +185,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p // worker want's to be terminated if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest { sp.stopWorker(w) - return sp.ExecWithContext(ctx, p) + return sp.execWithTTL(ctx, p) } err = sp.checkMaxJobs(w) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index c8b3616c..b1318f9d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -191,7 +191,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) @@ -518,11 +518,11 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index bfb997d8..5abeae7a 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -48,7 +48,7 @@ type ttlExec struct { p payload.Payload } -func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { - res, err := sp.pool.ExecWithContext(ctx, rqs) + res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { c <- ttlExec{ err: errors.E(op, err), @@ -86,13 +86,42 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } } -func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec") - rsp, err := sp.pool.Exec(p) - if err != nil { - return payload.Payload{}, errors.E(op, err) +func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } } - return rsp, nil } func (sp *supervised) GetConfig() interface{} { @@ -155,21 +184,13 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } @@ -210,11 +231,7 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 85af4672..d7e97fdd 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -9,7 +9,6 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/tools" "github.com/stretchr/testify/assert" ) @@ -37,28 +36,8 @@ func TestSupervisedPool_Exec(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - stopCh := make(chan struct{}) - defer p.Destroy(context.Background()) - go func() { - for { - select { - case <-stopCh: - return - default: - workers := p.Workers() - if len(workers) > 0 { - s, err := tools.WorkerProcessState(workers[0]) - assert.NoError(t, err) - assert.NotNil(t, s) - // since this is soft limit, double max memory limit watch - if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { - assert.Fail(t, "max memory reached, worker still alive") - } - } - } - } - }() + pidBefore := p.Workers()[0].Pid() for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) @@ -69,7 +48,9 @@ func TestSupervisedPool_Exec(t *testing.T) { assert.NoError(t, err) } - stopCh <- struct{}{} + assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) + + p.Destroy(context.Background()) } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { @@ -99,7 +80,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + resp, err := p.execWithTTL(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -129,7 +110,7 @@ func TestSupervisedPool_Idle(t *testing.T) { ctx := context.Background() p, err := Initialize( ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, ) @@ -139,7 +120,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + resp, err := p.execWithTTL(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -149,6 +130,13 @@ func TestSupervisedPool_Idle(t *testing.T) { assert.Empty(t, resp.Context) time.Sleep(time.Second * 5) + + // worker should be marked as invalid and reallocated + _, err = p.execWithTTL(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) p.Destroy(context.Background()) @@ -170,7 +158,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { ctx := context.Background() p, err := Initialize( ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, ) diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 96eb25bc..2b68717a 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -70,5 +70,5 @@ type SyncWorker interface { // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 82a5462a..ac987c14 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -63,7 +63,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) diff --git a/tests/exec_ttl.php b/tests/exec_ttl.php new file mode 100644 index 00000000..fb5c9df2 --- /dev/null +++ b/tests/exec_ttl.php @@ -0,0 +1,15 @@ +waitPayload()){ + sleep(3); + $rr->respond(new \Spiral\RoadRunner\Payload("")); +} diff --git a/tests/idle.php b/tests/idle.php new file mode 100644 index 00000000..fb5c9df2 --- /dev/null +++ b/tests/idle.php @@ -0,0 +1,15 @@ +waitPayload()){ + sleep(3); + $rr->respond(new \Spiral\RoadRunner\Payload("")); +} diff --git a/tests/memleak.php b/tests/memleak.php index f2879e18..96ed5006 100644 --- a/tests/memleak.php +++ b/tests/memleak.php @@ -10,6 +10,6 @@ require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); $mem = ''; while($rr->waitPayload()){ - $mem .= str_repeat("a", 1024*1024); + $mem .= str_repeat("a", 1024*1024*10); $rr->respond(new \Spiral\RoadRunner\Payload("")); } diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml index 3e392577..e92ce051 100644 --- a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml +++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml @@ -25,7 +25,7 @@ http: supervisor: watch_tick: 1s ttl: 0 - idle_ttl: 5s + idle_ttl: 1s exec_ttl: 10s max_worker_memory: 100 logs: diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index cfc5bc01..d55491ea 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -1287,9 +1287,13 @@ func TestHTTPSupervisedPool(t *testing.T) { }() time.Sleep(time.Second * 1) - t.Run("HTTPEchoTest", echoHTTP2) + t.Run("HTTPEchoRunActivateWorker", echoHTTP2) + // bigger timeout to handle idle_ttl on slow systems + time.Sleep(time.Second * 10) + t.Run("HTTPInformerCompareWorkersTestBefore", informerTestBefore) + t.Run("HTTPEchoShouldBeNewWorker", echoHTTP2) // worker should be destructed (idle_ttl) - t.Run("HTTPInformerCompareWorkersTest", informerTest2) + t.Run("HTTPInformerCompareWorkersTestAfter", informerTestAfter) stopCh <- struct{}{} wg.Wait() @@ -1314,11 +1318,12 @@ func echoHTTP2(t *testing.T) { // sleep // supervisor destroy worker // compare pid's -func informerTest2(t *testing.T) { +var workerPid int = 0 + +func informerTestBefore(t *testing.T) { conn, err := net.Dial("tcp", "127.0.0.1:15432") assert.NoError(t, err) client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - pid := 0 // WorkerList contains list of workers. list := struct { // Workers is list of workers. @@ -1329,18 +1334,25 @@ func informerTest2(t *testing.T) { assert.NoError(t, err) assert.Len(t, list.Workers, 1) // save the pid - pid = list.Workers[0].Pid - time.Sleep(time.Second * 10) + workerPid = list.Workers[0].Pid +} - list = struct { +func informerTestAfter(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:15432") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + // WorkerList contains list of workers. + list := struct { // Workers is list of workers. Workers []tools.ProcessState `json:"workers"` }{} + assert.NotZero(t, workerPid) + err = client.Call("informer.Workers", "http", &list) assert.NoError(t, err) assert.Len(t, list.Workers, 1) - assert.NotEqual(t, list.Workers[0].Pid, pid) + assert.NotEqual(t, workerPid, list.Workers[0].Pid) } func get(url string) (string, *http.Response, error) { diff --git a/tests/sleep.php b/tests/sleep.php index fb5c9df2..d36ae3e3 100644 --- a/tests/sleep.php +++ b/tests/sleep.php @@ -10,6 +10,6 @@ require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); while($rr->waitPayload()){ - sleep(3); + sleep(300); $rr->respond(new \Spiral\RoadRunner\Payload("")); } -- cgit v1.2.3