diff options
Diffstat (limited to 'socket_factory_test.go')
-rw-r--r-- | socket_factory_test.go | 303 |
1 files changed, 176 insertions, 127 deletions
diff --git a/socket_factory_test.go b/socket_factory_test.go index abb40f16..45443337 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -1,14 +1,18 @@ package roadrunner import ( - "github.com/stretchr/testify/assert" + "context" "net" "os/exec" + "sync" "testing" "time" + + "github.com/stretchr/testify/assert" ) func Test_Tcp_Start(t *testing.T) { + ctx := context.Background() time.Sleep(time.Millisecond * 10) // to ensure free socket ls, err := net.Listen("tcp", "localhost:9007") @@ -25,23 +29,23 @@ func Test_Tcp_Start(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) go func() { - assert.NoError(t, w.Wait()) + assert.NoError(t, w.Wait(ctx)) }() - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } } func Test_Tcp_StartCloseFactory(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { } else { @@ -50,7 +54,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") - f := NewSocketFactory(ls, time.Minute) + f := NewSocketServer(ls, time.Minute) defer func() { err := ls.Close() if err != nil { @@ -58,23 +62,19 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { } }() - w, err := f.SpawnWorker(cmd) + w, err := f.SpawnWorkerWithContext(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } } func Test_Tcp_StartError(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { @@ -93,37 +93,37 @@ func Test_Tcp_StartError(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } -func Test_Tcp_Failboot(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - - ls, err := net.Listen("tcp", "localhost:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "tests/failboot.php") - - w, err2 := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") -} +// func Test_Tcp_Failboot(t *testing.T) { +// time.Sleep(time.Millisecond * 10) // to ensure free socket +// +// ls, err := net.Listen("tcp", "localhost:9007") +// if assert.NoError(t, err) { +// defer func() { +// err3 := ls.Close() +// if err3 != nil { +// t.Errorf("error closing the listener: error %v", err3) +// } +// }() +// } else { +// t.Skip("socket is busy") +// } +// +// cmd := exec.Command("php", "tests/failboot.php") +// +// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd) +// assert.Nil(t, w) +// assert.Error(t, err2) +// assert.Contains(t, err2.Error(), "failboot") +//} func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { @@ -138,15 +138,15 @@ func Test_Tcp_Timeout(t *testing.T) { cmd := exec.Command("php", "tests/slow-client.php", "echo", "tcp", "200", "0") - w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "relay timeout") + assert.Contains(t, err.Error(), "context deadline exceeded") } func Test_Tcp_Invalid(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { @@ -161,14 +161,14 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "tests/invalid.php") - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Tcp_Broken(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { @@ -183,29 +183,38 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "broken", "tcp") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - go func() { - err := w.Wait() - - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + //go func() { + // err := w.Wait() + // + // assert.Error(t, err) + // assert.Contains(t, err.Error(), "undefined_function()") + //}() defer func() { time.Sleep(time.Second) - err2 := w.Stop() - assert.NoError(t, err2) + err2 := w.Stop(ctx) + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) + sw, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) } func Test_Tcp_Echo(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket - + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if assert.NoError(t, err) { defer func() { @@ -220,18 +229,23 @@ func Test_Tcp_Echo(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + //go func() { + // assert.NoError(t, w.Wait()) + //}() defer func() { - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) + sw, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -242,6 +256,7 @@ func Test_Tcp_Echo(t *testing.T) { } func Test_Unix_Start(t *testing.T) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -256,22 +271,23 @@ func Test_Unix_Start(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "unix") - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) - go func() { - assert.NoError(t, w.Wait()) - }() + //go func() { + // assert.NoError(t, w.Wait()) + //}() - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } } func Test_Unix_Failboot(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() if err == nil { defer func() { err := ls.Close() @@ -285,7 +301,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -293,6 +309,7 @@ func Test_Unix_Failboot(t *testing.T) { func Test_Unix_Timeout(t *testing.T) { ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() if err == nil { defer func() { err := ls.Close() @@ -306,13 +323,14 @@ func Test_Unix_Timeout(t *testing.T) { cmd := exec.Command("php", "tests/slow-client.php", "echo", "unix", "200", "0") - w, err := NewSocketFactory(ls, time.Millisecond*100).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "relay timeout") + assert.Contains(t, err.Error(), "context deadline exceeded") } func Test_Unix_Invalid(t *testing.T) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -327,12 +345,13 @@ func Test_Unix_Invalid(t *testing.T) { cmd := exec.Command("php", "tests/invalid.php") - w, err := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Unix_Broken(t *testing.T) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -347,26 +366,40 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "broken", "unix") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) go func() { - err := w.Wait() + defer wg.Done() + err := w.Wait(ctx) assert.Error(t, err) assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { time.Sleep(time.Second) - err = w.Stop() - assert.NoError(t, err) + err = w.Stop(ctx) + assert.Error(t, err) }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) + sw, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) + wg.Wait() } func Test_Unix_Echo(t *testing.T) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -381,18 +414,26 @@ func Test_Unix_Echo(t *testing.T) { cmd := exec.Command("php", "tests/client.php", "echo", "unix") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + //go func() { + // assert.NoError(t, w.Wait()) + //}() defer func() { - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) + sw, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -403,10 +444,11 @@ func Test_Unix_Echo(t *testing.T) { } func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } @@ -415,29 +457,33 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { b.Skip("socket is busy") } - f := NewSocketFactory(ls, time.Minute) + f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") - w, _ := f.SpawnWorker(cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() + w, err := f.SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } + //go func() { + // if w.Wait() != nil { + // b.Fail() + // } + //}() - err = w.Stop() + err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the worker: error %v", err) + b.Errorf("error stopping the WorkerProcess: error %v", err) } } } func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() ls, err := net.Listen("tcp", "localhost:9007") if err == nil { defer func() { - err := ls.Close() + err = ls.Close() if err != nil { b.Errorf("error closing the listener: error %v", err) } @@ -448,28 +494,31 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "tests/client.php", "echo", "tcp") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting: %v", err) - } - }() + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } defer func() { - err = w.Stop() + err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the worker: error %v", err) + b.Errorf("error stopping the WorkerProcess: error %v", err) } }() + sw, err := NewSyncWorker(w) + if err != nil { + b.Fatal(err) + } + for n := 0; n < b.N; n++ { - if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } } func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -482,25 +531,23 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { b.Skip("socket is busy") } - f := NewSocketFactory(ls, time.Minute) + f := NewSocketServer(ls, time.Minute) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "tests/client.php", "echo", "unix") - w, _ := f.SpawnWorker(cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err = w.Stop() + w, err := f.SpawnWorkerWithContext(ctx, cmd) if err != nil { - b.Errorf("error stopping the worker: error %v", err) + b.Fatal(err) + } + err = w.Stop(ctx) + if err != nil { + b.Errorf("error stopping the WorkerProcess: error %v", err) } } } func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() ls, err := net.Listen("unix", "sock.unix") if err == nil { defer func() { @@ -515,22 +562,24 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "tests/client.php", "echo", "unix") - w, _ := NewSocketFactory(ls, time.Minute).SpawnWorker(cmd) - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting: %v", err) - } - }() + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } defer func() { - err = w.Stop() + err = w.Stop(ctx) if err != nil { - b.Errorf("error stopping the worker: error %v", err) + b.Errorf("error stopping the WorkerProcess: error %v", err) } }() + sw, err := NewSyncWorker(w) + if err != nil { + b.Fatal(err) + } + for n := 0; n < b.N; n++ { - if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } |