summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-22 23:02:25 +0300
committerValery Piashchynski <[email protected]>2020-12-22 23:02:25 +0300
commitfd1e98bc6339abfa66523bf9d2208d00df8ee4bc (patch)
treeb679441276717e687a5b460ebeba7ad0eee69be9 /pkg/pool/static_pool_test.go
parent40b6c3169931a3fef62b649db19ff01dc685b7d4 (diff)
events listeners refactor, CLI initial commit
Diffstat (limited to 'pkg/pool/static_pool_test.go')
-rwxr-xr-xpkg/pool/static_pool_test.go137
1 files changed, 74 insertions, 63 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 30345aee..dcc930f6 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -27,10 +27,10 @@ var cfg = Config{
func Test_NewPool(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
assert.NoError(t, err)
@@ -41,10 +41,10 @@ func Test_NewPool(t *testing.T) {
}
func Test_StaticPool_Invalid(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
@@ -53,10 +53,10 @@ func Test_StaticPool_Invalid(t *testing.T) {
}
func Test_ConfigNoErrorInitDefaults(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -69,10 +69,10 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
assert.NoError(t, err)
@@ -93,10 +93,10 @@ func Test_StaticPool_Echo(t *testing.T) {
func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
assert.NoError(t, err)
@@ -117,10 +117,10 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
func Test_StaticPool_Echo_Context(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
assert.NoError(t, err)
@@ -141,10 +141,10 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
func Test_StaticPool_JobError(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
assert.NoError(t, err)
@@ -167,18 +167,9 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
- ctx,
- func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(),
- cfg,
- )
- assert.NoError(t, err)
- assert.NotNil(t, p)
-
block := make(chan struct{}, 1)
- p.AddListener(func(event interface{}) {
+ listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
if wev.Event == events.EventWorkerLog {
e := string(wev.Payload.([]byte))
@@ -188,7 +179,17 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
}
}
- })
+ }
+
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
+ pipe.NewPipeFactory(listener),
+ cfg,
+ AddListeners(listener),
+ )
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
time.Sleep(time.Second)
res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
@@ -203,11 +204,28 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ // Consume pool events
+ ev := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if pe, ok := event.(events.PoolEvent); ok {
+ if pe.Event == events.EventWorkerConstruct {
+ ev <- struct{}{}
+ }
+ }
+ }
+
+ var cfg = Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
+ }
+
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
+ AddListeners(listener),
)
assert.NoError(t, err)
defer p.Destroy(ctx)
@@ -222,39 +240,30 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Empty(t, res.Context)
assert.Equal(t, "hello", res.String())
- assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
-
- // Consume pool events
- wg := sync.WaitGroup{}
- wg.Add(1)
- p.AddListener(func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- wg.Done()
- }
- }
- })
+ assert.Equal(t, 1, len(p.Workers()))
+ // first creation
+ <-ev
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- wg.Wait()
+ // re-creation
+ <-ev
list := p.Workers()
for _, w := range list {
assert.Equal(t, internal.StateReady, w.State().Value())
}
- wg.Wait()
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
@@ -270,10 +279,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -307,10 +316,10 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
Debug: true,
AllocateTimeout: time.Second,
@@ -347,10 +356,10 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -387,10 +396,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -409,10 +418,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -439,10 +448,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Handle_Dead(t *testing.T) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -464,10 +473,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
- p, err := NewPool(
+ p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -483,10 +492,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
- p, err := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
cfg,
)
if err != nil {
@@ -505,16 +514,17 @@ func Benchmark_Pool_Echo(b *testing.B) {
//
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
- p, _ := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
+ assert.NoError(b, err)
defer p.Destroy(ctx)
var wg sync.WaitGroup
@@ -535,10 +545,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
//
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
- p, _ := NewPool(
+ p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(nil),
Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -546,6 +556,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
DestroyTimeout: time.Second,
},
)
+ assert.NoError(b, err)
defer p.Destroy(ctx)
b.ResetTimer()
b.ReportAllocs()