diff options
author | Valery Piashchynski <[email protected]> | 2020-10-26 12:01:53 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-10-26 12:01:53 +0300 |
commit | 91cf918b30938129609323ded53e190385e019a6 (patch) | |
tree | 0ad9537bd438c63719fb83343ab77fc4ab34eb83 /static_pool_test.go | |
parent | 68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff) | |
parent | 9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff) |
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'static_pool_test.go')
-rwxr-xr-x | static_pool_test.go | 114 |
1 files changed, 46 insertions, 68 deletions
diff --git a/static_pool_test.go b/static_pool_test.go index ce9e6820..ec80e92a 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "log" "os/exec" "runtime" @@ -18,7 +17,6 @@ var cfg = Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, } func Test_NewPool(t *testing.T) { @@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) - assert.Equal(t, cfg, p.Config()) - defer p.Destroy(ctx) assert.NotNil(t, p) @@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), - &cfg, + cfg, ) assert.Nil(t, p) @@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -143,20 +139,20 @@ func Test_StaticPool_JobError(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -167,7 +163,7 @@ func Test_StaticPool_JobError(t *testing.T) { // ctx, // func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, // NewPipeFactory(), -// &cfg, +// cfg, // ) // assert.NoError(t, err) // assert.NotNil(t, p) @@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) { // var i int64 // atomic.StoreInt64(&i, 10) // +// p.AddListener(func(event interface{}) { +// +// }) +// // go func() { // for { // select { @@ -197,7 +197,7 @@ func Test_StaticPool_JobError(t *testing.T) { // wg.Wait() // // p.Destroy(ctx) -//} +// } // func Test_StaticPool_Broken_FromOutside(t *testing.T) { @@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Consume pool events wg := sync.WaitGroup{} wg.Add(1) - go func() { - for true { - select { - case ev := <-p.Events(): - fmt.Println(ev) - if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct { - wg.Done() - } + p.AddListener(func(event interface{}) { + if pe, ok := event.(PoolEvent); ok { + if pe.Event == EventWorkerConstruct { + wg.Done() } } - }() + }) // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill(ctx) @@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, - ExecTTL: time.Second * 4, }, ) assert.Error(t, err) @@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) assert.NoError(t, err) @@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 15, }, ) assert.NoError(t, err) @@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NotNil(t, p) - go func() { - for { - select { - case ev := <-p.Events(): - fmt.Println(ev) - } - } - }() - var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err := p.Exec(Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) assert.NoError(t, err) @@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(StateErrored) } - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) @@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) if err != nil { b.Fatal(err) @@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } |