diff options
Diffstat (limited to 'pool/static_pool_test.go')
-rwxr-xr-x | pool/static_pool_test.go | 61 |
1 files changed, 25 insertions, 36 deletions
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 9861f0d8..717d301e 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -18,6 +18,7 @@ import ( "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfg = &Config{ @@ -167,26 +168,18 @@ func Test_StaticPool_JobError(t *testing.T) { func Test_StaticPool_Broken_Replace(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, pipe.NewPipeFactory(), cfg, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -196,22 +189,23 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + event := <-ch + if !strings.Contains(event.Message(), "undefined_function()") { + t.Fatal("event should contain undefiled function()") + } p.Destroy(ctx) } func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx := context.Background() + // Run pool events - ev := make(chan struct{}, 1) - listener := func(event interface{}) { - if pe, ok := event.(events.PoolEvent); ok { - if pe.Event == events.EventWorkerConstruct { - ev <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch) + require.NoError(t, err) var cfg2 = &Config{ NumWorkers: 1, @@ -224,7 +218,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), cfg2, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -242,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.Equal(t, 1, len(p.Workers())) // first creation - <-ev + <-ch // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill() if err != nil { @@ -250,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } // re-creation - <-ev + <-ch list := p.Workers() for _, w := range list { @@ -496,15 +489,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx := context.Background() - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.PoolEvent); ok { - if ev.Event == events.EventNoFreeWorkers { - block <- struct{}{} - } - } - } + eb, id := events.Bus() + defer eb.Unsubscribe(id) + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch) + require.NoError(t, err) p, err := Initialize( ctx, @@ -518,7 +508,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { DestroyTimeout: time.Second, Supervisor: nil, }, - AddListeners(listener), ) assert.NoError(t, err) assert.NotNil(t, p) @@ -532,7 +521,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + <-ch p.Destroy(ctx) } |