summaryrefslogtreecommitdiff
path: root/pool/static_pool_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'pool/static_pool_test.go')
-rwxr-xr-xpool/static_pool_test.go61
1 files changed, 25 insertions, 36 deletions
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 9861f0d8..717d301e 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -18,6 +18,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfg = &Config{
@@ -167,26 +168,18 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
-
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -196,22 +189,23 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ event := <-ch
+ if !strings.Contains(event.Message(), "undefined_function()") {
+ t.Fatal("event should contain undefiled function()")
+ }
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
+
// Run pool events
- ev := make(chan struct{}, 1)
- listener := func(event interface{}) {
- if pe, ok := event.(events.PoolEvent); ok {
- if pe.Event == events.EventWorkerConstruct {
- ev <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
+ require.NoError(t, err)
var cfg2 = &Config{
NumWorkers: 1,
@@ -224,7 +218,6 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
cfg2,
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -242,7 +235,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ev
+ <-ch
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -250,7 +243,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ev
+ <-ch
list := p.Workers()
for _, w := range list {
@@ -496,15 +489,12 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.PoolEvent); ok {
- if ev.Event == events.EventNoFreeWorkers {
- block <- struct{}{}
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
+ require.NoError(t, err)
p, err := Initialize(
ctx,
@@ -518,7 +508,6 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
DestroyTimeout: time.Second,
Supervisor: nil,
},
- AddListeners(listener),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -532,7 +521,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+ <-ch
p.Destroy(ctx)
}