summaryrefslogtreecommitdiff
path: root/pipe_factory_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
committerValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
commit0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch)
treeffcb65010bebe9f5b5436192979e64b2402a6ec0 /pipe_factory_test.go
parent08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff)
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'pipe_factory_test.go')
-rw-r--r--pipe_factory_test.go135
1 files changed, 98 insertions, 37 deletions
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 14cf1272..93d9ccd8 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -1,24 +1,28 @@
package roadrunner
import (
- "github.com/stretchr/testify/assert"
+ "context"
"os/exec"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
func Test_Pipe_Start(t *testing.T) {
+ ctx := context.Background()
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
go func() {
- assert.NoError(t, w.Wait())
+ ctx := context.Background()
+ assert.NoError(t, w.Wait(ctx))
}()
- assert.NoError(t, w.Stop())
+ assert.NoError(t, w.Stop(ctx))
}
func Test_Pipe_StartError(t *testing.T) {
@@ -28,7 +32,8 @@ func Test_Pipe_StartError(t *testing.T) {
t.Errorf("error running the command: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -40,7 +45,8 @@ func Test_Pipe_PipeError(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -52,14 +58,16 @@ func Test_Pipe_PipeError2(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
@@ -68,27 +76,32 @@ func Test_Pipe_Failboot(t *testing.T) {
func Test_Pipe_Invalid(t *testing.T) {
cmd := exec.Command("php", "tests/invalid.php")
-
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Echo(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
defer func() {
- err := w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -100,38 +113,41 @@ func Test_Pipe_Echo(t *testing.T) {
func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- err := w.Wait()
-
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
- }()
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
defer func() {
time.Sleep(time.Second)
- err := w.Stop()
- assert.NoError(t, err)
+ err = w.Stop(ctx)
+ assert.Error(t, err)
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorker(cmd)
+ w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
go func() {
- if w.Wait() != nil {
+ if w.Wait(context.Background()) != nil {
b.Fail()
}
}()
- err := w.Stop()
+ err := w.Stop(context.Background())
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -141,22 +157,67 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+ b.ReportAllocs()
+ b.ResetTimer()
go func() {
- err := w.Wait()
+ err := w.Wait(context.Background())
if err != nil {
b.Errorf("error waiting the worker: error %v", err)
}
}()
defer func() {
- err := w.Stop()
+ err := w.Stop(context.Background())
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
}()
for n := 0; n < b.N; n++ {
- if _, err := w.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(context.Background(), Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+}
+
+func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ //go func() {
+ // for {
+ // select {
+ // case event := <-w.Events():
+ // b.Fatal(event)
+ // }
+ // }
+ // //err := w.Wait()
+ // //if err != nil {
+ // // b.Errorf("error waiting the WorkerProcess: error %v", err)
+ // //}
+ //}()
+ defer func() {
+ err = w.Stop(ctx)
+ if err != nil {
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}