summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/static_pool_test.go')
-rwxr-xr-xpkg/pool/static_pool_test.go99
1 files changed, 55 insertions, 44 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 30345aee..acdd6ab7 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -27,7 +27,7 @@ var cfg = Config{
func Test_NewPool(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -41,7 +41,7 @@ func Test_NewPool(t *testing.T) {
}
func Test_StaticPool_Invalid(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
pipe.NewPipeFactory(),
@@ -53,7 +53,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
}
func Test_ConfigNoErrorInitDefaults(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -69,7 +69,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -93,7 +93,7 @@ func Test_StaticPool_Echo(t *testing.T) {
func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -117,7 +117,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
func Test_StaticPool_Echo_Context(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
pipe.NewPipeFactory(),
@@ -141,7 +141,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
func Test_StaticPool_JobError(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
@@ -167,18 +167,9 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
block := make(chan struct{}, 1)
- p.AddListener(func(event interface{}) {
+ listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
if wev.Event == events.EventWorkerLog {
e := string(wev.Payload.([]byte))
@@ -188,7 +179,17 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
}
}
- })
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
time.Sleep(time.Second)
res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
@@ -203,11 +204,28 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ // Consume pool events
+ ev := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
+ ev <- struct{}{}
+ }
+ }
+ }
+
+ var cfg = Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+ }
+
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
cfg,
+ AddListeners(listener),
)
assert.NoError(t, err)
defer p.Destroy(ctx)
@@ -222,36 +240,27 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
- assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
-
- // Consume pool events
- wg := sync.WaitGroup{}
- wg.Add(1)
- p.AddListener(func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- wg.Done()
- }
- }
- })
+ assert.Equal(t, 1, len(p.Workers()))
+ // first creation
+ <-ev
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- wg.Wait()
+ // re-creation
+ <-ev
list := p.Workers()
for _, w := range list {
assert.Equal(t, internal.StateReady, w.State().Value())
}
- wg.Wait()
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -270,7 +279,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -307,7 +316,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -347,7 +356,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
@@ -387,7 +396,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -409,7 +418,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -439,7 +448,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Handle_Dead(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -464,7 +473,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -483,7 +492,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -505,7 +514,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
//
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
- p, _ := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -515,6 +524,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(b, err)
defer p.Destroy(ctx)
var wg sync.WaitGroup
@@ -535,7 +545,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
//
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
- p, _ := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -546,6 +556,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(b, err)
defer p.Destroy(ctx)
b.ResetTimer()
b.ReportAllocs()