diff options
author | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
commit | 0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch) | |
tree | ffcb65010bebe9f5b5436192979e64b2402a6ec0 /sync_worker_test.go | |
parent | 08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff) |
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'sync_worker_test.go')
-rw-r--r-- | sync_worker_test.go | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/sync_worker_test.go b/sync_worker_test.go new file mode 100644 index 00000000..e1cec4b6 --- /dev/null +++ b/sync_worker_test.go @@ -0,0 +1,264 @@ +package roadrunner + +import ( + "context" + "errors" + "os/exec" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_Echo(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + res, err := syncWorker.Exec(ctx, EmptyPayload) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "payload can not be empty", err.Error()) +} + +func Test_NotStarted_String(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "inactive") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_NotStarted_Exec(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := InitBaseWorker(cmd) + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "WorkerProcess is not ready (inactive)", err.Error()) +} + +func Test_String(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + assert.NotNil(t, w) + tt := time.NewTimer(time.Second * 10) + defer wg.Done() + for { + select { + case ev := <-w.Events(): + assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") + return + case <-tt.C: + assert.Error(t, errors.New("no events from worker")) + return + } + } + }() + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + wg.Wait() + assert.Error(t, w.Stop(ctx)) +} + +func Test_Error(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + assert.IsType(t, TaskError{}, err) + assert.Equal(t, "hello", err.Error()) +} + +func Test_NumExecs(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + go func() { + assert.NoError(t, w.Wait(ctx)) + }() + defer func() { + err := w.Stop(ctx) + if err != nil { + t.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + syncWorker, err := NewSyncWorker(w) + if err != nil { + t.Fatal(err) + } + + _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(1), w.State().NumExecs()) + + _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(2), w.State().NumExecs()) + + _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, int64(3), w.State().NumExecs()) +} |