diff options
author | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
commit | 8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch) | |
tree | fae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg/transport | |
parent | 20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff) |
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg/transport')
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 14 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 16 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 62 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 64 |
4 files changed, 136 insertions, 20 deletions
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index e247324c..663b3dd5 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + w, err := NewPipeFactory().SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid2(t *testing.T) { diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index b23af19f..6045dd91 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 0e29e7d2..50729546 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -3,11 +3,13 @@ package socket import ( "net" "os/exec" + "strings" "sync" "syscall" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Invalid2(t *testing.T) { @@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo2(t *testing.T) { @@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout2(t *testing.T) { @@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Nil(t, res.Context) assert.Nil(t, res.Body) wg.Wait() + <-finish } func Test_Unix_Echo2(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index f55fc3dd..4abcd5d9 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -4,10 +4,12 @@ import ( "context" "net" "os/exec" + "strings" "sync" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Timeout(t *testing.T) { @@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo(t *testing.T) { @@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout(t *testing.T) { @@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + block := make(chan struct{}) + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerStderr { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) + <-block wg.Wait() } |