summaryrefslogtreecommitdiff
path: root/socket_factory_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'socket_factory_test.go')
-rw-r--r--socket_factory_test.go303
1 files changed, 176 insertions, 127 deletions
diff --git a/socket_factory_test.go b/socket_factory_test.go
index abb40f16..45443337 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -1,14 +1,18 @@
package roadrunner
import (
- "github.com/stretchr/testify/assert"
+ "context"
"net"
"os/exec"
+ "sync"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
func Test_Tcp_Start(t *testing.T) {
+ ctx := context.Background()
time.Sleep(time.Millisecond * 10) // to ensure free socket
ls, err := net.Listen("tcp", "localhost:9007")
@@ -25,23 +29,23 @@ func Test_Tcp_Start(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
go func() {
- assert.NoError(t, w.Wait())
+ assert.NoError(t, w.Wait(ctx))
}()
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
func Test_Tcp_StartCloseFactory(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
} else {
@@ -50,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
- f := NewSocketFactory(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute)
defer func() {
err := ls.Close()
if err != nil {
@@ -58,23 +62,19 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
}()
- w, err := f.SpawnWorker(cmd)
+ w, err := f.SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
func Test_Tcp_StartError(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
@@ -93,37 +93,37 @@ func Test_Tcp_StartError(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
-func Test_Tcp_Failboot(t *testing.T) {
- time.Sleep(time.Millisecond * 10) // to ensure free socket
-
- ls, err := net.Listen("tcp", "localhost:9007")
- if assert.NoError(t, err) {
- defer func() {
- err3 := ls.Close()
- if err3 != nil {
- t.Errorf("error closing the listener: error %v", err3)
- }
- }()
- } else {
- t.Skip("socket is busy")
- }
-
- cmd := exec.Command("php", "tests/failboot.php")
-
- w, err2 := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- assert.Nil(t, w)
- assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
-}
+// func Test_Tcp_Failboot(t *testing.T) {
+// time.Sleep(time.Millisecond * 10) // to ensure free socket
+//
+// ls, err := net.Listen("tcp", "localhost:9007")
+// if assert.NoError(t, err) {
+// defer func() {
+// err3 := ls.Close()
+// if err3 != nil {
+// t.Errorf("error closing the listener: error %v", err3)
+// }
+// }()
+// } else {
+// t.Skip("socket is busy")
+// }
+//
+// cmd := exec.Command("php", "tests/failboot.php")
+//
+// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd)
+// assert.Nil(t, w)
+// assert.Error(t, err2)
+// assert.Contains(t, err2.Error(), "failboot")
+//}
func Test_Tcp_Timeout(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
@@ -138,15 +138,15 @@ func Test_Tcp_Timeout(t *testing.T) {
cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0")
- w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "relay timeout")
+ assert.Contains(t, err.Error(), "context deadline exceeded")
}
func Test_Tcp_Invalid(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
@@ -161,14 +161,14 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "tests/invalid.php")
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Tcp_Broken(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
@@ -183,29 +183,38 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "broken", "tcp")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- err := w.Wait()
-
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
- }()
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ //go func() {
+ // err := w.Wait()
+ //
+ // assert.Error(t, err)
+ // assert.Contains(t, err.Error(), "undefined_function()")
+ //}()
defer func() {
time.Sleep(time.Second)
- err2 := w.Stop()
- assert.NoError(t, err2)
+ err2 := w.Stop(ctx)
+ // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
+ assert.Error(t, err2)
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
}
func Test_Tcp_Echo(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
-
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if assert.NoError(t, err) {
defer func() {
@@ -220,18 +229,23 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ //go func() {
+ // assert.NoError(t, w.Wait())
+ //}()
defer func() {
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -242,6 +256,7 @@ func Test_Tcp_Echo(t *testing.T) {
}
func Test_Unix_Start(t *testing.T) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -256,22 +271,23 @@ func Test_Unix_Start(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "unix")
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+ //go func() {
+ // assert.NoError(t, w.Wait())
+ //}()
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
func Test_Unix_Failboot(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
+ ctx := context.Background()
if err == nil {
defer func() {
err := ls.Close()
@@ -285,7 +301,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -293,6 +309,7 @@ func Test_Unix_Failboot(t *testing.T) {
func Test_Unix_Timeout(t *testing.T) {
ls, err := net.Listen("unix", "sock.unix")
+ ctx := context.Background()
if err == nil {
defer func() {
err := ls.Close()
@@ -306,13 +323,14 @@ func Test_Unix_Timeout(t *testing.T) {
cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "relay timeout")
+ assert.Contains(t, err.Error(), "context deadline exceeded")
}
func Test_Unix_Invalid(t *testing.T) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -327,12 +345,13 @@ func Test_Unix_Invalid(t *testing.T) {
cmd := exec.Command("php", "tests/invalid.php")
- w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Unix_Broken(t *testing.T) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -347,26 +366,40 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "broken", "unix")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
go func() {
- err := w.Wait()
+ defer wg.Done()
+ err := w.Wait(ctx)
assert.Error(t, err)
assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
time.Sleep(time.Second)
- err = w.Stop()
- assert.NoError(t, err)
+ err = w.Stop(ctx)
+ assert.Error(t, err)
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
+ wg.Wait()
}
func Test_Unix_Echo(t *testing.T) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -381,18 +414,26 @@ func Test_Unix_Echo(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "unix")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ //go func() {
+ // assert.NoError(t, w.Wait())
+ //}()
defer func() {
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -403,10 +444,11 @@ func Test_Unix_Echo(t *testing.T) {
}
func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}
@@ -415,29 +457,33 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
b.Skip("socket is busy")
}
- f := NewSocketFactory(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
- w, _ := f.SpawnWorker(cmd)
- go func() {
- if w.Wait() != nil {
- b.Fail()
- }
- }()
+ w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+ //go func() {
+ // if w.Wait() != nil {
+ // b.Fail()
+ // }
+ //}()
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
}
func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
+ ctx := context.Background()
ls, err := net.Listen("tcp", "localhost:9007")
if err == nil {
defer func() {
- err := ls.Close()
+ err = ls.Close()
if err != nil {
b.Errorf("error closing the listener: error %v", err)
}
@@ -448,28 +494,31 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- err := w.Wait()
- if err != nil {
- b.Errorf("error waiting: %v", err)
- }
- }()
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
defer func() {
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
}
}()
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
for n := 0; n < b.N; n++ {
- if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
}
func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -482,25 +531,23 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
b.Skip("socket is busy")
}
- f := NewSocketFactory(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "tests/client.php", "echo", "unix")
- w, _ := f.SpawnWorker(cmd)
- go func() {
- if w.Wait() != nil {
- b.Fail()
- }
- }()
-
- err = w.Stop()
+ w, err := f.SpawnWorkerWithContext(ctx, cmd)
if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
+ b.Fatal(err)
+ }
+ err = w.Stop(ctx)
+ if err != nil {
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
}
func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
+ ctx := context.Background()
ls, err := net.Listen("unix", "sock.unix")
if err == nil {
defer func() {
@@ -515,22 +562,24 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "tests/client.php", "echo", "unix")
- w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd)
- go func() {
- err := w.Wait()
- if err != nil {
- b.Errorf("error waiting: %v", err)
- }
- }()
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
defer func() {
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- b.Errorf("error stopping the worker: error %v", err)
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
}
}()
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
for n := 0; n < b.N; n++ {
- if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}