summaryrefslogtreecommitdiff
path: root/worker_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker_test.go')
-rw-r--r--worker_test.go279
1 files changed, 104 insertions, 175 deletions
diff --git a/worker_test.go b/worker_test.go
index c21e67cb..a90b7ef2 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -1,18 +1,21 @@
package roadrunner
import (
- "github.com/stretchr/testify/assert"
+ "context"
"os/exec"
+ "sync"
"testing"
- "time"
+
+ "github.com/stretchr/testify/assert"
)
func Test_GetState(t *testing.T) {
+ ctx := context.Background()
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
go func() {
- assert.NoError(t, w.Wait())
+ assert.NoError(t, w.Wait(ctx))
assert.Equal(t, StateStopped, w.State().Value())
}()
@@ -20,229 +23,155 @@ func Test_GetState(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- err = w.Stop()
+ err = w.Stop(ctx)
if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
+ t.Errorf("error stopping the WorkerProcess: error %v", err)
}
}
func Test_Kill(t *testing.T) {
+ ctx := context.Background()
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
go func() {
- assert.Error(t, w.Wait())
- assert.Equal(t, StateStopped, w.State().Value())
+ defer wg.Done()
+ assert.Error(t, w.Wait(ctx))
+ // TODO changed from stopped, discuss
+ assert.Equal(t, StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
assert.Equal(t, StateReady, w.State().Value())
- defer func() {
- err := w.Kill()
- if err != nil {
- t.Errorf("error killing the worker: error %v", err)
- }
- }()
+ err = w.Kill(ctx)
+ if err != nil {
+ t.Errorf("error killing the WorkerProcess: error %v", err)
+ }
+ wg.Wait()
}
-func Test_Echo(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
- }()
-
- res, err := w.Exec(&Payload{Body: []byte("hello")})
+func Test_OnStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ assert.Nil(t, cmd.Start())
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
+ w, err := InitBaseWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
- assert.Equal(t, "hello", res.String())
+ assert.Equal(t, "can't attach to running process", err.Error())
}
-func Test_BadPayload(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+func TestErrBuffer_Write_Len(t *testing.T) {
+ buf := newErrBuffer(nil)
defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ buf.Close()
}()
- res, err := w.Exec(nil)
-
- assert.Error(t, err)
- assert.Nil(t, res)
-
- assert.Equal(t, "payload can not be empty", err.Error())
-}
-
-func Test_NotStarted_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := newWorker(cmd)
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "inactive")
- assert.Contains(t, w.String(), "numExecs: 0")
+ _, err := buf.Write([]byte("hello"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
+ assert.Equal(t, 5, buf.Len())
+ assert.Equal(t, "hello", buf.String())
}
-func Test_NotStarted_Exec(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := newWorker(cmd)
-
- res, err := w.Exec(&Payload{Body: []byte("hello")})
-
- assert.Error(t, err)
- assert.Nil(t, res)
+func TestErrBuffer_Write_Event(t *testing.T) {
+ buf := newErrBuffer(nil)
+ defer func() {
+ buf.Close()
+ }()
- assert.Equal(t, "worker is not ready (inactive)", err.Error())
-}
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+ buf.logCallback = func(log []byte) {
+ assert.Equal(t, []byte("hello\n"), log)
+ wg.Done()
+ }
-func Test_String(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ _, err := buf.Write([]byte("hello\n"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
- }()
+ wg.Wait()
- assert.Contains(t, w.String(), "php tests/client.php echo pipes")
- assert.Contains(t, w.String(), "ready")
- assert.Contains(t, w.String(), "numExecs: 0")
+ // messages are read
+ assert.Equal(t, 0, buf.Len())
}
-func Test_Echo_Slow(t *testing.T) {
- cmd := exec.Command("php", "tests/slow-client.php", "echo", "pipes", "10", "10")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
+func TestErrBuffer_Write_Event_Separated(t *testing.T) {
+ buf := newErrBuffer(nil)
defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ buf.Close()
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
-
- assert.Nil(t, err)
- assert.NotNil(t, res)
- assert.NotNil(t, res.Body)
- assert.Nil(t, res.Context)
-
- assert.Equal(t, "hello", res.String())
-}
-
-func Test_Broken(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ buf.logCallback = func(log []byte) {
+ assert.Equal(t, []byte("hello\nending"), log)
+ wg.Done()
+ }
+ _, err := buf.Write([]byte("hel"))
if err != nil {
- t.Fatal(err)
+ t.Errorf("fail to write: error %v", err)
}
- go func() {
- err := w.Wait()
- assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
- }()
-
- res, err := w.Exec(&Payload{Body: []byte("hello")})
- assert.Nil(t, res)
- assert.NotNil(t, err)
-
- time.Sleep(time.Second)
- assert.NoError(t, w.Stop())
-}
-
-func Test_OnStarted(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
- assert.Nil(t, cmd.Start())
+ _, err = buf.Write([]byte("lo\n"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
- w, err := newWorker(cmd)
- assert.Nil(t, w)
- assert.NotNil(t, err)
+ _, err = buf.Write([]byte("ending"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
- assert.Equal(t, "can't attach to running process", err.Error())
+ wg.Wait()
+ assert.Equal(t, 0, buf.Len())
+ assert.Equal(t, "", buf.String())
}
-func Test_Error(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "error", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
-
+func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) {
+ buf := newErrBuffer(nil)
defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
+ buf.Close()
}()
- res, err := w.Exec(&Payload{Body: []byte("hello")})
- assert.Nil(t, res)
- assert.NotNil(t, err)
-
- assert.IsType(t, JobError{}, err)
- assert.Equal(t, "hello", err.Error())
-}
-
-func Test_NumExecs(t *testing.T) {
- cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
-
- w, _ := NewPipeFactory().SpawnWorker(cmd)
- go func() {
- assert.NoError(t, w.Wait())
- }()
- defer func() {
- err := w.Stop()
- if err != nil {
- t.Errorf("error stopping the worker: error %v", err)
- }
- }()
+ _, err := buf.Write([]byte("hel"))
+ if err != nil {
+ t.Errorf("fail to write: error %v", err)
+ }
- _, err := w.Exec(&Payload{Body: []byte("hello")})
+ _, err = buf.Write([]byte("lo\n"))
if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
+ t.Errorf("fail to write: error %v", err)
}
- assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = w.Exec(&Payload{Body: []byte("hello")})
+ _, err = buf.Write([]byte("ending"))
if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
+ t.Errorf("fail to write: error %v", err)
}
- assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = w.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, 12, buf.Len())
+ assert.Equal(t, "hello\nending", buf.String())
+}
+
+func TestErrBuffer_Write_Remaining(t *testing.T) {
+ buf := newErrBuffer(nil)
+ defer func() {
+ buf.Close()
+ }()
+
+ _, err := buf.Write([]byte("hel"))
if err != nil {
- t.Errorf("fail to execute payload: error %v", err)
+ t.Errorf("fail to write: error %v", err)
}
- assert.Equal(t, int64(3), w.State().NumExecs())
+
+ assert.Equal(t, 3, buf.Len())
+ assert.Equal(t, "hel", buf.String())
}