summaryrefslogtreecommitdiff
path: root/pkg/transport
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
committerValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
commit8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch)
treefae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg/transport
parent20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff)
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg/transport')
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go14
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go16
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go62
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go64
4 files changed, 136 insertions, 20 deletions
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index e247324c..663b3dd5 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+ w, err := NewPipeFactory().SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid2(t *testing.T) {
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index b23af19f..6045dd91 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 0e29e7d2..50729546 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -3,11 +3,13 @@ package socket
import (
"net"
"os/exec"
+ "strings"
"sync"
"syscall"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
wg.Wait()
+ <-finish
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index f55fc3dd..4abcd5d9 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -4,10 +4,12 @@ import (
"context"
"net"
"os/exec"
+ "strings"
"sync"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo(t *testing.T) {
@@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout(t *testing.T) {
@@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ block := make(chan struct{})
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerStderr {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
+ <-block
wg.Wait()
}