summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-24 13:46:37 +0300
committerGitHub <[email protected]>2021-02-24 13:46:37 +0300
commitcaea9cb452fda97a9496bc33190c95fbc27e54c3 (patch)
tree0d226eb8ad9730ede1f7cd80b5f7b44d1fb23b0a
parent18a097292a567fccdd02304ff236bf78d769965d (diff)
parente684ac16035bed9a4c09677b0db3b33477955dc9 (diff)
Merge pull request #562 from spiral/supervisor_update
🐛 fix(supervisor): supervisor behavior update
-rw-r--r--CHANGELOG.md7
-rw-r--r--go.sum6
-rw-r--r--pkg/pool/interface.go6
-rwxr-xr-xpkg/pool/static_pool.go10
-rwxr-xr-xpkg/pool/static_pool_test.go6
-rwxr-xr-xpkg/pool/supervisor_pool.go63
-rw-r--r--pkg/pool/supervisor_test.go42
-rw-r--r--pkg/worker/interface.go2
-rwxr-xr-xpkg/worker/sync_worker.go2
-rw-r--r--tests/exec_ttl.php15
-rw-r--r--tests/idle.php15
-rw-r--r--tests/memleak.php2
-rw-r--r--tests/plugins/http/configs/.rr-http-supervised-pool.yaml2
-rw-r--r--tests/plugins/http/http_plugin_test.go28
-rw-r--r--tests/sleep.php2
15 files changed, 125 insertions, 83 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b11ffdad..3f31aca3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,11 +4,12 @@ CHANGELOG
v2.0.0-RC.4 (20.02.2021)
-------------------
-- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550)
-- Endure container update to v1.0.0-RC.2 version
+- PHP tests use latest signatures (https://github.com/spiral/roadrunner/pull/550).
+- Endure container update to v1.0.0-RC.2 version.
- Remove unneeded mutex from the `http.Workers` method.
-- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557)
+- Rename `checker` plugin package to `status`, remove `/v1` endpoint prefix (#557).
- Add static, headers, status, gzip plugins to the `main.go`.
+- Fix workers pool behavior -> idle_ttl, ttl, max_memory are soft errors and exec_ttl is hard error.
v2.0.0-RC.3 (17.02.2021)
-------------------
diff --git a/go.sum b/go.sum
index 4bac6827..113aa09b 100644
--- a/go.sum
+++ b/go.sum
@@ -114,11 +114,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
-github.com/go-redis/redis/v8 v8.5.0 h1:L3r1Q3I5WOUdXZGCP6g44EruKh0u3n6co5Hl5xWkdGA=
-github.com/go-redis/redis/v8 v8.5.0/go.mod h1:YmEcgBDttjnkbMzDAhDtQxY9yVA7jMN6PCR5HeMvqFE=
github.com/go-redis/redis/v8 v8.6.0 h1:swqbqOrxaPztsj2Hf1p94M3YAgl7hYEpcw21z299hh8=
github.com/go-redis/redis/v8 v8.6.0/go.mod h1:DQ9q4Rk2HtwkrwVrdgmphoOQDMfpvcd/nHEwRsicg8s=
-github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68 h1:QR2R74UbwMtnEVGVvNfcx6mQmWGgN8abQeXOy92pQIo=
github.com/go-restit/lzjson v0.0.0-20161206095556-efe3c53acc68/go.mod h1:7vXSKQt83WmbPeyVjCfNT9YDJ5BUFmcwFsEjI9SCvYM=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@@ -416,11 +413,8 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
-github.com/yookoala/gofast v0.5.0 h1:QFGxVAf5ksx0eqFapzvc1ZDvk/LoYhbCyiw7WKoXtSA=
-github.com/yookoala/gofast v0.5.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ=
github.com/yookoala/gofast v0.6.0 h1:E5x2acfUD7GkzCf8bmIMwnV10VxDy5tUCHc5LGhluwc=
github.com/yookoala/gofast v0.6.0/go.mod h1:OJU201Q6HCaE1cASckaTbMm3KB6e0cZxK0mgqfwOKvQ=
-github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg=
github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index bfc56c3f..4ef2f2e7 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -15,9 +15,6 @@ type Pool interface {
// Exec executes task with payload
Exec(rqs payload.Payload) (payload.Payload, error)
- // ExecWithContext executes task with context which is used with timeout
- ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
-
// Workers returns worker list associated with the pool.
Workers() (workers []worker.BaseProcess)
@@ -26,4 +23,7 @@ type Pool interface {
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
+
+ // ExecWithContext executes task with context which is used with timeout
+ execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index f1b20bb9..0617cbc0 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -168,16 +168,16 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
}
// Be careful, sync with pool.Exec method
-func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
- ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
defer cancel()
- w, err := sp.getWorker(ctxGetFree, op)
+ w, err := sp.getWorker(ctxAlloc, op)
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p)
+ rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p)
if err != nil {
return sp.err_encoder(err, w)
}
@@ -185,7 +185,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
// worker want's to be terminated
if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest {
sp.stopWorker(w)
- return sp.ExecWithContext(ctx, p)
+ return sp.execWithTTL(ctx, p)
}
err = sp.checkMaxJobs(w)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index c8b3616c..b1318f9d 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -191,7 +191,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
time.Sleep(time.Second)
- res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
@@ -518,11 +518,11 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.NotNil(t, p)
go func() {
- _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
}()
time.Sleep(time.Second)
- res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
+ res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index bfb997d8..5abeae7a 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -48,7 +48,7 @@ type ttlExec struct {
p payload.Payload
}
-func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
defer cancel()
go func() {
- res, err := sp.pool.ExecWithContext(ctx, rqs)
+ res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
@@ -86,13 +86,42 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
-func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised_exec")
- rsp, err := sp.pool.Exec(p)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec_with_context")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
+ defer cancel()
+ go func() {
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: payload.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return payload.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
}
- return rsp, nil
}
func (sp *supervised) GetConfig() interface{} {
@@ -155,21 +184,13 @@ func (sp *supervised) control() {
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
}
@@ -210,11 +231,7 @@ func (sp *supervised) control() {
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 85af4672..d7e97fdd 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -9,7 +9,6 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/tools"
"github.com/stretchr/testify/assert"
)
@@ -37,28 +36,8 @@ func TestSupervisedPool_Exec(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- stopCh := make(chan struct{})
- defer p.Destroy(context.Background())
- go func() {
- for {
- select {
- case <-stopCh:
- return
- default:
- workers := p.Workers()
- if len(workers) > 0 {
- s, err := tools.WorkerProcessState(workers[0])
- assert.NoError(t, err)
- assert.NotNil(t, s)
- // since this is soft limit, double max memory limit watch
- if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 {
- assert.Fail(t, "max memory reached, worker still alive")
- }
- }
- }
- }
- }()
+ pidBefore := p.Workers()[0].Pid()
for i := 0; i < 100; i++ {
time.Sleep(time.Millisecond * 100)
@@ -69,7 +48,9 @@ func TestSupervisedPool_Exec(t *testing.T) {
assert.NoError(t, err)
}
- stopCh <- struct{}{}
+ assert.NotEqual(t, pidBefore, p.Workers()[0].Pid())
+
+ p.Destroy(context.Background())
}
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
@@ -99,7 +80,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ resp, err := p.execWithTTL(context.Background(), payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -129,7 +110,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
ctx := context.Background()
p, err := Initialize(
ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") },
pipe.NewPipeFactory(),
cfgExecTTL,
)
@@ -139,7 +120,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ resp, err := p.execWithTTL(context.Background(), payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -149,6 +130,13 @@ func TestSupervisedPool_Idle(t *testing.T) {
assert.Empty(t, resp.Context)
time.Sleep(time.Second * 5)
+
+ // worker should be marked as invalid and reallocated
+ _, err = p.execWithTTL(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+ assert.NoError(t, err)
// should be new worker with new pid
assert.NotEqual(t, pid, p.Workers()[0].Pid())
p.Destroy(context.Background())
@@ -170,7 +158,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
ctx := context.Background()
p, err := Initialize(
ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") },
pipe.NewPipeFactory(),
cfgExecTTL,
)
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 96eb25bc..2b68717a 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -70,5 +70,5 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
// ExecWithContext used to handle Exec with TTL
- ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 82a5462a..ac987c14 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -63,7 +63,7 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec_worker_with_timeout")
c := make(chan wexec, 1)
diff --git a/tests/exec_ttl.php b/tests/exec_ttl.php
new file mode 100644
index 00000000..fb5c9df2
--- /dev/null
+++ b/tests/exec_ttl.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->waitPayload()){
+ sleep(3);
+ $rr->respond(new \Spiral\RoadRunner\Payload(""));
+}
diff --git a/tests/idle.php b/tests/idle.php
new file mode 100644
index 00000000..fb5c9df2
--- /dev/null
+++ b/tests/idle.php
@@ -0,0 +1,15 @@
+<?php
+
+declare(strict_types=1);
+
+use Spiral\Goridge\StreamRelay;
+use Spiral\RoadRunner\Worker as RoadRunner;
+
+require __DIR__ . "/vendor/autoload.php";
+
+$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
+
+while($rr->waitPayload()){
+ sleep(3);
+ $rr->respond(new \Spiral\RoadRunner\Payload(""));
+}
diff --git a/tests/memleak.php b/tests/memleak.php
index f2879e18..96ed5006 100644
--- a/tests/memleak.php
+++ b/tests/memleak.php
@@ -10,6 +10,6 @@ require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
$mem = '';
while($rr->waitPayload()){
- $mem .= str_repeat("a", 1024*1024);
+ $mem .= str_repeat("a", 1024*1024*10);
$rr->respond(new \Spiral\RoadRunner\Payload(""));
}
diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
index 3e392577..e92ce051 100644
--- a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
+++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
@@ -25,7 +25,7 @@ http:
supervisor:
watch_tick: 1s
ttl: 0
- idle_ttl: 5s
+ idle_ttl: 1s
exec_ttl: 10s
max_worker_memory: 100
logs:
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index cfc5bc01..d55491ea 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -1287,9 +1287,13 @@ func TestHTTPSupervisedPool(t *testing.T) {
}()
time.Sleep(time.Second * 1)
- t.Run("HTTPEchoTest", echoHTTP2)
+ t.Run("HTTPEchoRunActivateWorker", echoHTTP2)
+ // bigger timeout to handle idle_ttl on slow systems
+ time.Sleep(time.Second * 10)
+ t.Run("HTTPInformerCompareWorkersTestBefore", informerTestBefore)
+ t.Run("HTTPEchoShouldBeNewWorker", echoHTTP2)
// worker should be destructed (idle_ttl)
- t.Run("HTTPInformerCompareWorkersTest", informerTest2)
+ t.Run("HTTPInformerCompareWorkersTestAfter", informerTestAfter)
stopCh <- struct{}{}
wg.Wait()
@@ -1314,11 +1318,12 @@ func echoHTTP2(t *testing.T) {
// sleep
// supervisor destroy worker
// compare pid's
-func informerTest2(t *testing.T) {
+var workerPid int = 0
+
+func informerTestBefore(t *testing.T) {
conn, err := net.Dial("tcp", "127.0.0.1:15432")
assert.NoError(t, err)
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- pid := 0
// WorkerList contains list of workers.
list := struct {
// Workers is list of workers.
@@ -1329,18 +1334,25 @@ func informerTest2(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, list.Workers, 1)
// save the pid
- pid = list.Workers[0].Pid
- time.Sleep(time.Second * 10)
+ workerPid = list.Workers[0].Pid
+}
- list = struct {
+func informerTestAfter(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:15432")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+ // WorkerList contains list of workers.
+ list := struct {
// Workers is list of workers.
Workers []tools.ProcessState `json:"workers"`
}{}
+ assert.NotZero(t, workerPid)
+
err = client.Call("informer.Workers", "http", &list)
assert.NoError(t, err)
assert.Len(t, list.Workers, 1)
- assert.NotEqual(t, list.Workers[0].Pid, pid)
+ assert.NotEqual(t, workerPid, list.Workers[0].Pid)
}
func get(url string) (string, *http.Response, error) {
diff --git a/tests/sleep.php b/tests/sleep.php
index fb5c9df2..d36ae3e3 100644
--- a/tests/sleep.php
+++ b/tests/sleep.php
@@ -10,6 +10,6 @@ require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
while($rr->waitPayload()){
- sleep(3);
+ sleep(300);
$rr->respond(new \Spiral\RoadRunner\Payload(""));
}