diff options
Diffstat (limited to 'static_pool_test.go')
-rw-r--r-- | static_pool_test.go | 343 |
1 files changed, 343 insertions, 0 deletions
diff --git a/static_pool_test.go b/static_pool_test.go new file mode 100644 index 00000000..c3b3cbba --- /dev/null +++ b/static_pool_test.go @@ -0,0 +1,343 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "log" + "os/exec" + "runtime" + "strconv" + "sync" + "testing" + "time" +) + +var cfg = Config{ + NumWorkers: uint64(runtime.NumCPU()), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, +} + +func Test_NewPool(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + assert.Equal(t, cfg, p.Config()) + + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) +} + +func Test_StaticPool_Invalid(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, + NewPipeFactory(), + cfg, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_ConfigError(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + Config{ + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + + assert.Nil(t, p) + assert.Error(t, err) +} + +func Test_StaticPool_Echo(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_NilContext(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_StaticPool_Echo_Context(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Nil(t, res.Body) + assert.NotNil(t, res.Context) + + assert.Equal(t, "world", string(res.Context)) +} + +func Test_StaticPool_JobError(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.IsType(t, JobError{}, err) + assert.Equal(t, "hello", err.Error()) +} + +func Test_StaticPool_Broken_Replace(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + p.observer = func(e int, w *Worker, ctx interface{}) { + if err, ok := ctx.(error); ok { + assert.Contains(t, err.Error(), "undefined_function()") + } + } + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Millisecond * 50, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + done := make(chan interface{}) + go func() { + _, err := p.Exec(&Payload{Body: []byte("100")}) + assert.NoError(t, err) + close(done) + }() + + // to ensure that worker is already busy + time.Sleep(time.Millisecond * 10) + + _, err = p.Exec(&Payload{Body: []byte("10")}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "worker timeout") + + <-done + p.Destroy() +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxExecutions: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + var lastPID string + lastPID = strconv.Itoa(*p.Workers()[0].Pid) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +// identical to replace but controlled on worker side +func Test_StaticPool_Stop_Worker(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + var lastPID string + lastPID = strconv.Itoa(*p.Workers()[0].Pid) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + +func Benchmark_Pool_Allocate(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + w, err := p.allocateWorker() + if err != nil { + b.Fail() + log.Println(err) + } + + p.free <- w + } +} + +func Benchmark_Pool_Echo(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pool_Echo_Batched(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + var wg sync.WaitGroup + for i := 0; i < b.N; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + }() + } + + wg.Wait() +} + +func Benchmark_Pool_Echo_Replaced(b *testing.B) { + p, _ := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxExecutions: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + for n := 0; n < b.N; n++ { + if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + b.Fail() + log.Println(err) + } + } +} |