summaryrefslogtreecommitdiff
path: root/static_pool_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool_test.go')
-rw-r--r--static_pool_test.go76
1 files changed, 47 insertions, 29 deletions
diff --git a/static_pool_test.go b/static_pool_test.go
index a2daedd6..b2ab4713 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -220,29 +220,36 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Nil(t, res.Context)
assert.Equal(t, "hello", res.String())
- assert.Equal(t, runtime.NumCPU(), len(p.Workers(ctx)))
+ assert.Equal(t, runtime.NumCPU(), len(p.Workers()))
// Consume pool events
+ wg := sync.WaitGroup{}
+ wg.Add(1)
go func() {
for true {
select {
case ev := <-p.Events():
fmt.Println(ev)
+ if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct {
+ wg.Done()
+ }
}
}
}()
// killing random worker and expecting pool to replace it
- err = p.Workers(ctx)[0].Kill(ctx)
+ err = p.Workers()[0].Kill(ctx)
if err != nil {
t.Errorf("error killing the process: error %v", err)
}
- time.Sleep(time.Second * 2)
+ wg.Wait()
- for _, w := range p.Workers(ctx) {
+ list := p.Workers()
+ for _, w := range list {
assert.Equal(t, StateReady, w.State().Value())
}
+ wg.Wait()
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
@@ -281,7 +288,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
assert.NotNil(t, p)
var lastPID string
- lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid()))
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
res, _ := p.Exec(ctx, Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
@@ -318,8 +325,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NotNil(t, p)
+ go func() {
+ for {
+ select {
+ case ev := <-p.Events():
+ fmt.Println(ev)
+ }
+ }
+ }()
+
var lastPID string
- lastPID = strconv.Itoa(int(p.Workers(ctx)[0].Pid()))
+ lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
if err != nil {
@@ -395,29 +411,31 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
}
// identical to replace but controlled on worker side
-// TODO inconsistent state
-//func Test_Static_Pool_Handle_Dead(t *testing.T) {
-// p, err := NewPool(
-// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
-// NewPipeFactory(),
-// Config{
-// NumWorkers: 5,
-// AllocateTimeout: time.Second,
-// DestroyTimeout: time.Second,
-// },
-// )
-// assert.NoError(t, err)
-// defer p.Destroy()
-//
-// assert.NotNil(t, p)
-//
-// for _, w := range p.stack {
-// w.state.value = StateErrored
-// }
-//
-// _, err = p.Exec(&Payload{Body: []byte("hello")})
-// assert.Error(t, err)
-//}
+func Test_Static_Pool_Handle_Dead(t *testing.T) {
+ ctx := context.Background()
+ p, err := NewPool(
+ context.Background(),
+ func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
+ NewPipeFactory(),
+ &Config{
+ NumWorkers: 5,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ ExecTTL: time.Second * 5,
+ },
+ )
+ assert.NoError(t, err)
+ defer p.Destroy(ctx)
+
+ assert.NotNil(t, p)
+
+ for _, w := range p.Workers() {
+ w.State().Set(StateErrored)
+ }
+
+ _, err = p.Exec(ctx, Payload{Body: []byte("hello")})
+ assert.Error(t, err)
+}
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {