diff options
Diffstat (limited to 'worker_test.go')
-rw-r--r-- | worker_test.go | 279 |
1 files changed, 104 insertions, 175 deletions
diff --git a/worker_test.go b/worker_test.go index c21e67cb..a90b7ef2 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1,18 +1,21 @@ package roadrunner import ( - "github.com/stretchr/testify/assert" + "context" "os/exec" + "sync" "testing" - "time" + + "github.com/stretchr/testify/assert" ) func Test_GetState(t *testing.T) { + ctx := context.Background() cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) go func() { - assert.NoError(t, w.Wait()) + assert.NoError(t, w.Wait(ctx)) assert.Equal(t, StateStopped, w.State().Value()) }() @@ -20,229 +23,155 @@ func Test_GetState(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, StateReady, w.State().Value()) - err = w.Stop() + err = w.Stop(ctx) if err != nil { - t.Errorf("error stopping the worker: error %v", err) + t.Errorf("error stopping the WorkerProcess: error %v", err) } } func Test_Kill(t *testing.T) { + ctx := context.Background() cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) go func() { - assert.Error(t, w.Wait()) - assert.Equal(t, StateStopped, w.State().Value()) + defer wg.Done() + assert.Error(t, w.Wait(ctx)) + // TODO changed from stopped, discuss + assert.Equal(t, StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) assert.Equal(t, StateReady, w.State().Value()) - defer func() { - err := w.Kill() - if err != nil { - t.Errorf("error killing the worker: error %v", err) - } - }() + err = w.Kill(ctx) + if err != nil { + t.Errorf("error killing the WorkerProcess: error %v", err) + } + wg.Wait() } -func Test_Echo(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } - }() - - res, err := w.Exec(&Payload{Body: []byte("hello")}) +func Test_OnStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + assert.Nil(t, cmd.Start()) - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) + w, err := InitBaseWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) - assert.Equal(t, "hello", res.String()) + assert.Equal(t, "can't attach to running process", err.Error()) } -func Test_BadPayload(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() +func TestErrBuffer_Write_Len(t *testing.T) { + buf := newErrBuffer(nil) defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + buf.Close() }() - res, err := w.Exec(nil) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Equal(t, "payload can not be empty", err.Error()) -} - -func Test_NotStarted_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := newWorker(cmd) - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "inactive") - assert.Contains(t, w.String(), "numExecs: 0") + _, err := buf.Write([]byte("hello")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } + assert.Equal(t, 5, buf.Len()) + assert.Equal(t, "hello", buf.String()) } -func Test_NotStarted_Exec(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := newWorker(cmd) - - res, err := w.Exec(&Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) +func TestErrBuffer_Write_Event(t *testing.T) { + buf := newErrBuffer(nil) + defer func() { + buf.Close() + }() - assert.Equal(t, "worker is not ready (inactive)", err.Error()) -} + wg := &sync.WaitGroup{} + wg.Add(1) + buf.logCallback = func(log []byte) { + assert.Equal(t, []byte("hello\n"), log) + wg.Done() + } -func Test_String(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + _, err := buf.Write([]byte("hello\n")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } - }() + wg.Wait() - assert.Contains(t, w.String(), "php tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "numExecs: 0") + // messages are read + assert.Equal(t, 0, buf.Len()) } -func Test_Echo_Slow(t *testing.T) { - cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() +func TestErrBuffer_Write_Event_Separated(t *testing.T) { + buf := newErrBuffer(nil) defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + buf.Close() }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Nil(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + wg := &sync.WaitGroup{} + wg.Add(1) - w, err := NewPipeFactory().SpawnWorker(cmd) + buf.logCallback = func(log []byte) { + assert.Equal(t, []byte("hello\nending"), log) + wg.Done() + } + _, err := buf.Write([]byte("hel")) if err != nil { - t.Fatal(err) + t.Errorf("fail to write: error %v", err) } - go func() { - err := w.Wait() - assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") - }() - - res, err := w.Exec(&Payload{Body: []byte("hello")}) - assert.Nil(t, res) - assert.NotNil(t, err) - - time.Sleep(time.Second) - assert.NoError(t, w.Stop()) -} - -func Test_OnStarted(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "broken", "pipes") - assert.Nil(t, cmd.Start()) + _, err = buf.Write([]byte("lo\n")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } - w, err := newWorker(cmd) - assert.Nil(t, w) - assert.NotNil(t, err) + _, err = buf.Write([]byte("ending")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } - assert.Equal(t, "can't attach to running process", err.Error()) + wg.Wait() + assert.Equal(t, 0, buf.Len()) + assert.Equal(t, "", buf.String()) } -func Test_Error(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - +func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { + buf := newErrBuffer(nil) defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } + buf.Close() }() - res, err := w.Exec(&Payload{Body: []byte("hello")}) - assert.Nil(t, res) - assert.NotNil(t, err) - - assert.IsType(t, JobError{}, err) - assert.Equal(t, "hello", err.Error()) -} - -func Test_NumExecs(t *testing.T) { - cmd := exec.Command("php", "tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the worker: error %v", err) - } - }() + _, err := buf.Write([]byte("hel")) + if err != nil { + t.Errorf("fail to write: error %v", err) + } - _, err := w.Exec(&Payload{Body: []byte("hello")}) + _, err = buf.Write([]byte("lo\n")) if err != nil { - t.Errorf("fail to execute payload: error %v", err) + t.Errorf("fail to write: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = w.Exec(&Payload{Body: []byte("hello")}) + _, err = buf.Write([]byte("ending")) if err != nil { - t.Errorf("fail to execute payload: error %v", err) + t.Errorf("fail to write: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = w.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, 12, buf.Len()) + assert.Equal(t, "hello\nending", buf.String()) +} + +func TestErrBuffer_Write_Remaining(t *testing.T) { + buf := newErrBuffer(nil) + defer func() { + buf.Close() + }() + + _, err := buf.Write([]byte("hel")) if err != nil { - t.Errorf("fail to execute payload: error %v", err) + t.Errorf("fail to write: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + + assert.Equal(t, 3, buf.Len()) + assert.Equal(t, "hel", buf.String()) } |