summaryrefslogtreecommitdiff
path: root/sync_worker_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
committerValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
commit0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch)
treeffcb65010bebe9f5b5436192979e64b2402a6ec0 /sync_worker_test.go
parent08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff)
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'sync_worker_test.go')
-rw-r--r--sync_worker_test.go264
1 files changed, 264 insertions, 0 deletions
diff --git a/sync_worker_test.go b/sync_worker_test.go
new file mode 100644
index 00000000..e1cec4b6
--- /dev/null
+++ b/sync_worker_test.go
@@ -0,0 +1,264 @@
+package roadrunner
+
+import (
+ "context"
+ "errors"
+ "os/exec"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Echo(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_BadPayload(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ res, err := syncWorker.Exec(ctx, EmptyPayload)
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "payload can not be empty", err.Error())
+}
+
+func Test_NotStarted_String(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "inactive")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_NotStarted_Exec(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := InitBaseWorker(cmd)
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+
+ assert.Error(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "WorkerProcess is not ready (inactive)", err.Error())
+}
+
+func Test_String(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ assert.Contains(t, w.String(), "php tests/client.php echo pipes")
+ assert.Contains(t, w.String(), "ready")
+ assert.Contains(t, w.String(), "numExecs: 0")
+}
+
+func Test_Echo_Slow(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Broken(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ assert.NotNil(t, w)
+ tt := time.NewTimer(time.Second * 10)
+ defer wg.Done()
+ for {
+ select {
+ case ev := <-w.Events():
+ assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()")
+ return
+ case <-tt.C:
+ assert.Error(t, errors.New("no events from worker"))
+ return
+ }
+ }
+ }()
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ wg.Wait()
+ assert.Error(t, w.Stop(ctx))
+}
+
+func Test_Error(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "error", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ assert.NotNil(t, err)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.IsType(t, TaskError{}, err)
+ assert.Equal(t, "hello", err.Error())
+}
+
+func Test_NumExecs(t *testing.T) {
+ ctx := context.Background()
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+
+ w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ go func() {
+ assert.NoError(t, w.Wait(ctx))
+ }()
+ defer func() {
+ err := w.Stop(ctx)
+ if err != nil {
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ syncWorker, err := NewSyncWorker(w)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(1), w.State().NumExecs())
+
+ _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(2), w.State().NumExecs())
+
+ _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Errorf("fail to execute payload: error %v", err)
+ }
+ assert.Equal(t, int64(3), w.State().NumExecs())
+}