summaryrefslogtreecommitdiff
path: root/static_pool_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool_test.go')
-rwxr-xr-x[-rw-r--r--]static_pool_test.go304
1 files changed, 181 insertions, 123 deletions
diff --git a/static_pool_test.go b/static_pool_test.go
index 59822186..747f26c4 100644..100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -1,7 +1,7 @@
package roadrunner
import (
- "github.com/stretchr/testify/assert"
+ "context"
"log"
"os/exec"
"runtime"
@@ -10,31 +10,35 @@ import (
"sync"
"testing"
"time"
+
+ "github.com/spiral/errors"
+ "github.com/stretchr/testify/assert"
)
-var cfg = Config{
+var cfg = PoolConfig{
NumWorkers: int64(runtime.NumCPU()),
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second,
+ AllocateTimeout: time.Second * 5,
+ DestroyTimeout: time.Second * 5,
}
func Test_NewPool(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- assert.Equal(t, cfg, p.Config())
-
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
}
func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewPool(
+ context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
NewPipeFactory(),
cfg,
@@ -44,33 +48,36 @@ func Test_StaticPool_Invalid(t *testing.T) {
assert.Error(t, err)
}
-func Test_ConfigError(t *testing.T) {
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := NewPool(
+ context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
)
- assert.Nil(t, p)
- assert.Error(t, err)
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
}
func Test_StaticPool_Echo(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -81,18 +88,20 @@ func Test_StaticPool_Echo(t *testing.T) {
}
func Test_StaticPool_Echo_NilContext(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.Exec(&Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -103,18 +112,20 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
}
func Test_StaticPool_Echo_Context(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.Exec(&Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -125,27 +136,35 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
}
func Test_StaticPool_JobError(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res)
+ assert.Nil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ if errors.Is(errors.ErrSoftJob, err) == false {
+ t.Fatal("error should be of type errors.Exec")
+ }
- assert.IsType(t, JobError{}, err)
- assert.Equal(t, "hello", err.Error())
+ assert.Contains(t, err.Error(), "hello")
}
func Test_StaticPool_Broken_Replace(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
NewPipeFactory(),
cfg,
@@ -153,38 +172,44 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- done := make(chan interface{})
+ block := make(chan struct{})
- p.Listen(func(e int, ctx interface{}) {
- if err, ok := ctx.(error); ok {
- if strings.Contains(err.Error(), "undefined_function()") {
- close(done)
+ p.AddListener(func(event interface{}) {
+ if wev, ok := event.(WorkerEvent); ok {
+ if wev.Event == EventWorkerLog {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
}
}
})
- res, err := p.Exec(&Payload{Body: []byte("hello")})
-
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
- assert.Nil(t, res)
+ assert.Nil(t, res.Context)
+ assert.Nil(t, res.Body)
- <-done
- p.Destroy()
-}
+ <-block
+ p.Destroy(ctx)
+}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -194,88 +219,115 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, "hello", res.String())
assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
- destructed := make(chan interface{})
- p.Listen(func(e int, ctx interface{}) {
- if e == EventWorkerConstruct {
- destructed <- nil
+ // Consume pool events
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ p.AddListener(func(event interface{}) {
+ if pe, ok := event.(PoolEvent); ok {
+ if pe.Event == EventWorkerConstruct {
+ wg.Done()
+ }
}
})
// killing random worker and expecting pool to replace it
- err = p.Workers()[0].cmd.Process.Kill()
+ err = p.Workers()[0].Kill()
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- <-destructed
- for _, w := range p.Workers() {
- assert.Equal(t, StateReady, w.state.Value())
+ wg.Wait()
+
+ list := p.Workers()
+ for _, w := range list {
+ assert.Equal(t, StateReady, w.State().Value())
}
+ wg.Wait()
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewPool(
+ context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
},
)
- if err != nil {
- t.Fatal(err)
+ assert.Error(t, err)
+ if !errors.Is(errors.WorkerAllocate, err) {
+ t.Fatal("error should be of type WorkerAllocate")
}
+ assert.Nil(t, p)
+}
- done := make(chan interface{})
- go func() {
- if p != nil {
- _, err := p.Exec(&Payload{Body: []byte("100")})
- assert.NoError(t, err)
- close(done)
- } else {
- panic("Pool is nil")
- }
- }()
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
+ NewPipeFactory(),
+ PoolConfig{
+ NumWorkers: 1,
+ MaxJobs: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+ assert.NotNil(t, p)
- // to ensure that worker is already busy
- time.Sleep(time.Millisecond * 10)
+ var lastPID string
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- _, err = p.Exec(&Payload{Body: []byte("10")})
- if err == nil {
- t.Fatal("Test_StaticPool_AllocateTimeout exec should raise error")
- }
- assert.Contains(t, err.Error(), "worker timeout")
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(Payload{Body: []byte("hello")})
- <-done
- p.Destroy()
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
}
-func Test_StaticPool_Replace_Worker(t *testing.T) {
+func Test_StaticPool_Debug_Worker(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
NewPipeFactory(),
- Config{
- NumWorkers: 1,
- MaxJobs: 1,
+ PoolConfig{
+ Debug: true,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
+ assert.Len(t, p.Workers(), 0)
+
var lastPID string
- lastPID = strconv.Itoa(*p.Workers()[0].Pid)
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
+ assert.NotEqual(t, lastPID, string(res.Body))
- res, _ := p.Exec(&Payload{Body: []byte("hello")})
- assert.Equal(t, lastPID, string(res.Body))
+ assert.Len(t, p.Workers(), 0)
for i := 0; i < 10; i++ {
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ assert.Len(t, p.Workers(), 0)
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -289,28 +341,33 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
var lastPID string
- lastPID = strconv.Itoa(*p.Workers()[0].Pid)
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(&Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
+ if err != nil {
+ t.Fatal(err)
+ }
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(&Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -324,10 +381,12 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -337,17 +396,19 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- p.Destroy()
- _, err = p.Exec(&Payload{Body: []byte("100")})
+ p.Destroy(ctx)
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -358,49 +419,51 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(&Payload{Body: []byte("100")})
+ _, err := p.Exec(Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
-
}()
time.Sleep(time.Millisecond * 10)
- p.Destroy()
- _, err = p.Exec(&Payload{Body: []byte("100")})
+ p.Destroy(ctx)
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
// identical to replace but controlled on worker side
func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
p, err := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
)
assert.NoError(t, err)
- defer p.Destroy()
+ defer p.Destroy(ctx)
assert.NotNil(t, p)
- for _, w := range p.workers {
- w.state.value = StateErrored
+ for _, w := range p.Workers() {
+ w.State().Set(StateErrored)
}
- _, err = p.Exec(&Payload{Body: []byte("hello")})
+ _, err = p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
}
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p, err := NewPool(
+ context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -410,61 +473,51 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, p)
- p.Destroy()
-}
-
-func Benchmark_Pool_Allocate(b *testing.B) {
- p, _ := NewPool(
- func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
- NewPipeFactory(),
- cfg,
- )
- defer p.Destroy()
-
- for n := 0; n < b.N; n++ {
- w, err := p.allocateWorker()
- if err != nil {
- b.Fail()
- log.Println(err)
- }
-
- p.free <- w
- }
+ p.Destroy(context.Background())
}
func Benchmark_Pool_Echo(b *testing.B) {
- p, _ := NewPool(
+ ctx := context.Background()
+ p, err := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
cfg,
)
- defer p.Destroy()
+ if err != nil {
+ b.Fatal(err)
+ }
+ b.ResetTimer()
+ b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
}
+//
func Benchmark_Pool_Echo_Batched(b *testing.B) {
+ ctx := context.Background()
p, _ := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
- defer p.Destroy()
+ defer p.Destroy(ctx)
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -474,21 +527,26 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Wait()
}
+//
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
+ ctx := context.Background()
p, _ := NewPool(
+ ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- Config{
+ PoolConfig{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
)
- defer p.Destroy()
+ defer p.Destroy(ctx)
+ b.ResetTimer()
+ b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(&Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}