summaryrefslogtreecommitdiff
path: root/transport
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-15 00:12:23 +0300
committerValery Piashchynski <[email protected]>2021-12-15 00:12:23 +0300
commitb4e4f7e7e60bff48a63df4a3c606398ea2a32d8a (patch)
treed018a39795b94f61e1dadde54ce3382fc7e039b0 /transport
parentf2c79017ae5759256b03ec58b608f298a29e4b96 (diff)
Update static_pool and worker to wait response from the worker
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'transport')
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go36
-rwxr-xr-xtransport/pipe/pipe_factory_test.go36
-rw-r--r--transport/socket/socket_factory_spawn_test.go30
-rwxr-xr-xtransport/socket/socket_factory_test.go30
4 files changed, 66 insertions, 66 deletions
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 96dd37a6..7e04f113 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -133,18 +133,9 @@ func Test_Pipe_Invalid2(t *testing.T) {
func Test_Pipe_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- err = w.Stop()
- if err != nil {
- t.Errorf("error stopping the Process: error %v", err)
- }
- }()
+ assert.NoError(t, err)
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NoError(t, err)
@@ -152,27 +143,31 @@ func Test_Pipe_Echo2(t *testing.T) {
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Test_Pipe_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
w, err := NewPipeFactory().SpawnWorker(cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
+ assert.NoError(t, err)
+ require.NotNil(t, w)
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.Error(t, err)
assert.Nil(t, res)
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.Error(t, err)
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
@@ -315,7 +310,6 @@ func Test_BadPayload2(t *testing.T) {
}()
res, err := sw.Exec(&payload.Payload{})
-
assert.Error(t, err)
assert.Nil(t, res)
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index 7ca49d09..c69be298 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -159,6 +159,7 @@ func Test_Pipe_Echo(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+
defer func() {
err = w.Stop()
if err != nil {
@@ -167,14 +168,18 @@ func Test_Pipe_Echo(t *testing.T) {
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
}
@@ -194,14 +199,18 @@ func Test_Pipe_Echo_Script(t *testing.T) {
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body)
assert.Empty(t, res.Context)
+ go func() {
+ if w.Wait() != nil {
+ t.Fail()
+ }
+ }()
+
assert.Equal(t, "hello", res.String())
}
@@ -210,21 +219,22 @@ func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
+ require.NoError(t, err)
+ require.NotNil(t, w)
+
+ go func() {
+ errW := w.Wait()
+ require.Error(t, errW)
}()
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
-
assert.Error(t, err)
assert.Nil(t, res)
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 2db2fd40..fd852080 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -53,7 +53,6 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
-
f := NewSocketServer(ls, time.Minute)
defer func() {
err = ls.Close()
@@ -66,6 +65,10 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, w)
+ go func() {
+ require.NoError(t, w.Wait())
+ }()
+
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -180,15 +183,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res)
@@ -198,6 +193,12 @@ func Test_Tcp_Broken2(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function() string")
}
+
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ // but process exited
+ assert.NoError(t, err2)
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -347,14 +348,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -365,6 +359,10 @@ func Test_Unix_Broken2(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function string")
}
+
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 7b28a847..10885bac 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -58,7 +58,6 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
-
f := NewSocketServer(ls, time.Minute)
defer func() {
err = ls.Close()
@@ -71,6 +70,10 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, w)
+ go func() {
+ require.NoError(t, w.Wait())
+ }()
+
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -221,15 +224,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err2 := w.Stop()
- // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
- assert.Error(t, err2)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res)
@@ -239,6 +234,12 @@ func Test_Tcp_Broken(t *testing.T) {
if !strings.Contains(ev.Message(), "undefined_function()") {
t.Fatal("should contain undefined_function string")
}
+
+ time.Sleep(time.Second)
+ err2 := w.Stop()
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ // but process is stopped
+ assert.NoError(t, err2)
}
func Test_Tcp_Echo(t *testing.T) {
@@ -460,14 +461,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, errW)
}()
- defer func() {
- time.Sleep(time.Second)
- err = w.Stop()
- assert.Error(t, err)
- }()
-
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
@@ -478,6 +472,10 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal("should contain undefined_function string")
}
+ time.Sleep(time.Second)
+ err = w.Stop()
+ assert.NoError(t, err)
+
wg.Wait()
}