summaryrefslogtreecommitdiff
path: root/static_pool_test.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-26 12:01:53 +0300
committerGitHub <[email protected]>2020-10-26 12:01:53 +0300
commit91cf918b30938129609323ded53e190385e019a6 (patch)
tree0ad9537bd438c63719fb83343ab77fc4ab34eb83 /static_pool_test.go
parent68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff)
parent9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff)
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'static_pool_test.go')
-rwxr-xr-xstatic_pool_test.go114
1 files changed, 46 insertions, 68 deletions
diff --git a/static_pool_test.go b/static_pool_test.go
index ce9e6820..ec80e92a 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"log"
"os/exec"
"runtime"
@@ -18,7 +17,6 @@ var cfg = Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
}
func Test_NewPool(t *testing.T) {
@@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
- assert.Equal(t, cfg, p.Config())
-
defer p.Destroy(ctx)
assert.NotNil(t, p)
@@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.Nil(t, p)
@@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -143,20 +139,20 @@ func Test_StaticPool_JobError(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, TaskError{}, err)
+ assert.IsType(t, JobError{}, err)
assert.Equal(t, "hello", err.Error())
}
@@ -167,7 +163,7 @@ func Test_StaticPool_JobError(t *testing.T) {
// ctx,
// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
// NewPipeFactory(),
-// &cfg,
+// cfg,
// )
// assert.NoError(t, err)
// assert.NotNil(t, p)
@@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) {
// var i int64
// atomic.StoreInt64(&i, 10)
//
+// p.AddListener(func(event interface{}) {
+//
+// })
+//
// go func() {
// for {
// select {
@@ -197,7 +197,7 @@ func Test_StaticPool_JobError(t *testing.T) {
// wg.Wait()
//
// p.Destroy(ctx)
-//}
+// }
//
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
@@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
// Consume pool events
wg := sync.WaitGroup{}
wg.Add(1)
- go func() {
- for true {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct {
- wg.Done()
- }
+ p.AddListener(func(event interface{}) {
+ if pe, ok := event.(PoolEvent); ok {
+ if pe.Event == EventWorkerConstruct {
+ wg.Done()
}
}
- }()
+ })
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill(ctx)
@@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
- ExecTTL: time.Second * 4,
},
)
assert.Error(t, err)
@@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
assert.NoError(t, err)
@@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 15,
},
)
assert.NoError(t, err)
@@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NotNil(t, p)
- go func() {
- for {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- }
- }
- }()
-
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err := p.Exec(Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
assert.NoError(t, err)
@@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(StateErrored)
}
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
@@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
if err != nil {
b.Fatal(err)
@@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}