diff options
author | Valery Piashchynski <[email protected]> | 2020-12-22 23:02:25 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-22 23:02:25 +0300 |
commit | fd1e98bc6339abfa66523bf9d2208d00df8ee4bc (patch) | |
tree | b679441276717e687a5b460ebeba7ad0eee69be9 /pkg/pool/static_pool_test.go | |
parent | 40b6c3169931a3fef62b649db19ff01dc685b7d4 (diff) |
events listeners refactor, CLI initial commit
Diffstat (limited to 'pkg/pool/static_pool_test.go')
-rwxr-xr-x | pkg/pool/static_pool_test.go | 137 |
1 files changed, 74 insertions, 63 deletions
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 30345aee..dcc930f6 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -27,10 +27,10 @@ var cfg = Config{ func Test_NewPool(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) assert.NoError(t, err) @@ -41,10 +41,10 @@ func Test_NewPool(t *testing.T) { } func Test_StaticPool_Invalid(t *testing.T) { - p, err := NewPool( + p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) @@ -53,10 +53,10 @@ func Test_StaticPool_Invalid(t *testing.T) { } func Test_ConfigNoErrorInitDefaults(t *testing.T) { - p, err := NewPool( + p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -69,10 +69,10 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { func Test_StaticPool_Echo(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) assert.NoError(t, err) @@ -93,10 +93,10 @@ func Test_StaticPool_Echo(t *testing.T) { func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) assert.NoError(t, err) @@ -117,10 +117,10 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { func Test_StaticPool_Echo_Context(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) assert.NoError(t, err) @@ -141,10 +141,10 @@ func Test_StaticPool_Echo_Context(t *testing.T) { func Test_StaticPool_JobError(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) assert.NoError(t, err) @@ -167,18 +167,9 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - p, err := NewPool( - ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(), - cfg, - ) - assert.NoError(t, err) - assert.NotNil(t, p) - block := make(chan struct{}, 1) - p.AddListener(func(event interface{}) { + listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { if wev.Event == events.EventWorkerLog { e := string(wev.Payload.([]byte)) @@ -188,7 +179,17 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } } } - }) + } + + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, + pipe.NewPipeFactory(listener), + cfg, + AddListeners(listener), + ) + assert.NoError(t, err) + assert.NotNil(t, p) time.Sleep(time.Second) res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) @@ -203,11 +204,28 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() - p, err := NewPool( + // Consume pool events + ev := make(chan struct{}, 1) + listener := func(event interface{}) { + if pe, ok := event.(events.PoolEvent); ok { + if pe.Event == events.EventWorkerConstruct { + ev <- struct{}{} + } + } + } + + var cfg = Config{ + NumWorkers: 1, + AllocateTimeout: time.Second * 5, + DestroyTimeout: time.Second * 5, + } + + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, + AddListeners(listener), ) assert.NoError(t, err) defer p.Destroy(ctx) @@ -222,39 +240,30 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Empty(t, res.Context) assert.Equal(t, "hello", res.String()) - assert.Equal(t, runtime.NumCPU(), len(p.Workers())) - - // Consume pool events - wg := sync.WaitGroup{} - wg.Add(1) - p.AddListener(func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - wg.Done() - } - } - }) + assert.Equal(t, 1, len(p.Workers())) + // first creation + <-ev // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { t.Errorf("error killing the process: error %v", err) } - wg.Wait() + // re-creation + <-ev list := p.Workers() for _, w := range list { assert.Equal(t, internal.StateReady, w.State().Value()) } - wg.Wait() } func Test_StaticPool_AllocateTimeout(t *testing.T) { - p, err := NewPool( + p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, @@ -270,10 +279,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { func Test_StaticPool_Replace_Worker(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, MaxJobs: 1, @@ -307,10 +316,10 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { func Test_StaticPool_Debug_Worker(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ Debug: true, AllocateTimeout: time.Second, @@ -347,10 +356,10 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -387,10 +396,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -409,10 +418,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -439,10 +448,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Handle_Dead(t *testing.T) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -464,10 +473,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { // identical to replace but controlled on worker side func Test_Static_Pool_Slow_Destroy(t *testing.T) { - p, err := NewPool( + p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -483,10 +492,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Benchmark_Pool_Echo(b *testing.B) { ctx := context.Background() - p, err := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), cfg, ) if err != nil { @@ -505,16 +514,17 @@ func Benchmark_Pool_Echo(b *testing.B) { // func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx := context.Background() - p, _ := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, ) + assert.NoError(b, err) defer p.Destroy(ctx) var wg sync.WaitGroup @@ -535,10 +545,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { // func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx := context.Background() - p, _ := NewPool( + p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(nil), Config{ NumWorkers: 1, MaxJobs: 1, @@ -546,6 +556,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { DestroyTimeout: time.Second, }, ) + assert.NoError(b, err) defer p.Destroy(ctx) b.ResetTimer() b.ReportAllocs() |