diff options
Diffstat (limited to 'static_pool_test.go')
-rwxr-xr-x[-rw-r--r--] | static_pool_test.go | 304 |
1 files changed, 181 insertions, 123 deletions
diff --git a/static_pool_test.go b/static_pool_test.go index 59822186..747f26c4 100644..100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -1,7 +1,7 @@ package roadrunner import ( - "github.com/stretchr/testify/assert" + "context" "log" "os/exec" "runtime" @@ -10,31 +10,35 @@ import ( "sync" "testing" "time" + + "github.com/spiral/errors" + "github.com/stretchr/testify/assert" ) -var cfg = Config{ +var cfg = PoolConfig{ NumWorkers: int64(runtime.NumCPU()), - AllocateTimeout: time.Second, - DestroyTimeout: time.Second, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, } func Test_NewPool(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - assert.Equal(t, cfg, p.Config()) - - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) } func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( + context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), cfg, @@ -44,33 +48,36 @@ func Test_StaticPool_Invalid(t *testing.T) { assert.Error(t, err) } -func Test_ConfigError(t *testing.T) { +func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewPool( + context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, ) - assert.Nil(t, p) - assert.Error(t, err) + assert.NotNil(t, p) + assert.NoError(t, err) } func Test_StaticPool_Echo(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.Exec(&Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -81,18 +88,20 @@ func Test_StaticPool_Echo(t *testing.T) { } func Test_StaticPool_Echo_NilContext(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -103,18 +112,20 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { } func Test_StaticPool_Echo_Context(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -125,27 +136,35 @@ func Test_StaticPool_Echo_Context(t *testing.T) { } func Test_StaticPool_JobError(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.Exec(&Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res) + assert.Nil(t, res.Body) + assert.Nil(t, res.Context) + + if errors.Is(errors.ErrSoftJob, err) == false { + t.Fatal("error should be of type errors.Exec") + } - assert.IsType(t, JobError{}, err) - assert.Equal(t, "hello", err.Error()) + assert.Contains(t, err.Error(), "hello") } func Test_StaticPool_Broken_Replace(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, NewPipeFactory(), cfg, @@ -153,38 +172,44 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - done := make(chan interface{}) + block := make(chan struct{}) - p.Listen(func(e int, ctx interface{}) { - if err, ok := ctx.(error); ok { - if strings.Contains(err.Error(), "undefined_function()") { - close(done) + p.AddListener(func(event interface{}) { + if wev, ok := event.(WorkerEvent); ok { + if wev.Event == EventWorkerLog { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } } } }) - res, err := p.Exec(&Payload{Body: []byte("hello")}) - + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) - assert.Nil(t, res) + assert.Nil(t, res.Context) + assert.Nil(t, res.Body) - <-done - p.Destroy() -} + <-block + p.Destroy(ctx) +} func Test_StaticPool_Broken_FromOutside(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.Exec(&Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -194,88 +219,115 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, "hello", res.String()) assert.Equal(t, runtime.NumCPU(), len(p.Workers())) - destructed := make(chan interface{}) - p.Listen(func(e int, ctx interface{}) { - if e == EventWorkerConstruct { - destructed <- nil + // Consume pool events + wg := sync.WaitGroup{} + wg.Add(1) + p.AddListener(func(event interface{}) { + if pe, ok := event.(PoolEvent); ok { + if pe.Event == EventWorkerConstruct { + wg.Done() + } } }) // killing random worker and expecting pool to replace it - err = p.Workers()[0].cmd.Process.Kill() + err = p.Workers()[0].Kill() if err != nil { t.Errorf("error killing the process: error %v", err) } - <-destructed - for _, w := range p.Workers() { - assert.Equal(t, StateReady, w.state.Value()) + wg.Wait() + + list := p.Workers() + for _, w := range list { + assert.Equal(t, StateReady, w.State().Value()) } + wg.Wait() } func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( + context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, }, ) - if err != nil { - t.Fatal(err) + assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") } + assert.Nil(t, p) +} - done := make(chan interface{}) - go func() { - if p != nil { - _, err := p.Exec(&Payload{Body: []byte("100")}) - assert.NoError(t, err) - close(done) - } else { - panic("Pool is nil") - } - }() +func Test_StaticPool_Replace_Worker(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, + NewPipeFactory(), + PoolConfig{ + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + assert.NotNil(t, p) - // to ensure that worker is already busy - time.Sleep(time.Millisecond * 10) + var lastPID string + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - _, err = p.Exec(&Payload{Body: []byte("10")}) - if err == nil { - t.Fatal("Test_StaticPool_AllocateTimeout exec should raise error") - } - assert.Contains(t, err.Error(), "worker timeout") + res, _ := p.Exec(Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(Payload{Body: []byte("hello")}) - <-done - p.Destroy() + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } } -func Test_StaticPool_Replace_Worker(t *testing.T) { +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - Config{ - NumWorkers: 1, - MaxJobs: 1, + PoolConfig{ + Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) + assert.Len(t, p.Workers(), 0) + var lastPID string - lastPID = strconv.Itoa(*p.Workers()[0].Pid) + res, _ := p.Exec(Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) - res, _ := p.Exec(&Payload{Body: []byte("hello")}) - assert.Equal(t, lastPID, string(res.Body)) + assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { - res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Len(t, p.Workers(), 0) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -289,28 +341,33 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) var lastPID string - lastPID = strconv.Itoa(*p.Workers()[0].Pid) + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(&Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) + if err != nil { + t.Fatal(err) + } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(&Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -324,10 +381,12 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -337,17 +396,19 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Destroy() - _, err = p.Exec(&Payload{Body: []byte("100")}) + p.Destroy(ctx) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { + ctx := context.Background() p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -358,49 +419,51 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(&Payload{Body: []byte("100")}) + _, err := p.Exec(Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } - }() time.Sleep(time.Millisecond * 10) - p.Destroy() - _, err = p.Exec(&Payload{Body: []byte("100")}) + p.Destroy(ctx) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } // identical to replace but controlled on worker side func Test_Static_Pool_Handle_Dead(t *testing.T) { + ctx := context.Background() p, err := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + context.Background(), + func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, ) assert.NoError(t, err) - defer p.Destroy() + defer p.Destroy(ctx) assert.NotNil(t, p) - for _, w := range p.workers { - w.state.value = StateErrored + for _, w := range p.Workers() { + w.State().Set(StateErrored) } - _, err = p.Exec(&Payload{Body: []byte("hello")}) + _, err = p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) } // identical to replace but controlled on worker side func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewPool( + context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -410,61 +473,51 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - p.Destroy() -} - -func Benchmark_Pool_Allocate(b *testing.B) { - p, _ := NewPool( - func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, - NewPipeFactory(), - cfg, - ) - defer p.Destroy() - - for n := 0; n < b.N; n++ { - w, err := p.allocateWorker() - if err != nil { - b.Fail() - log.Println(err) - } - - p.free <- w - } + p.Destroy(context.Background()) } func Benchmark_Pool_Echo(b *testing.B) { - p, _ := NewPool( + ctx := context.Background() + p, err := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), cfg, ) - defer p.Destroy() + if err != nil { + b.Fatal(err) + } + b.ResetTimer() + b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } } +// func Benchmark_Pool_Echo_Batched(b *testing.B) { + ctx := context.Background() p, _ := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, ) - defer p.Destroy() + defer p.Destroy(ctx) var wg sync.WaitGroup for i := 0; i < b.N; i++ { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -474,21 +527,26 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Wait() } +// func Benchmark_Pool_Echo_Replaced(b *testing.B) { + ctx := context.Background() p, _ := NewPool( + ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - Config{ + PoolConfig{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, ) - defer p.Destroy() + defer p.Destroy(ctx) + b.ResetTimer() + b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } |