summaryrefslogtreecommitdiff
path: root/transport/pipe
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /transport/pipe
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'transport/pipe')
-rwxr-xr-xtransport/pipe/pipe_factory.go9
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go46
-rwxr-xr-xtransport/pipe/pipe_factory_test.go44
3 files changed, 44 insertions, 55 deletions
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go
index 3ea8fd98..c70b3f65 100755
--- a/transport/pipe/pipe_factory.go
+++ b/transport/pipe/pipe_factory.go
@@ -5,7 +5,6 @@ import (
"os/exec"
"github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
)
@@ -27,10 +26,10 @@ type sr struct {
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
spCh := make(chan sr)
go func() {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case spCh <- sr{
@@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 45b7aef8..81004027 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -12,6 +12,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState2(t *testing.T) {
@@ -105,21 +106,20 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid2(t *testing.T) {
@@ -368,17 +368,13 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorker(cmd, listener)
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -390,11 +386,11 @@ func Test_Broken2(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index b4ba8c87..8c6d440a 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_GetState(t *testing.T) {
@@ -125,22 +126,20 @@ func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Pipe_Invalid(t *testing.T) {
@@ -433,17 +432,13 @@ func Test_Broken(t *testing.T) {
t.Parallel()
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- data := ""
- mu := &sync.Mutex{}
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- mu.Lock()
- data = string(wev.Payload.([]byte))
- mu.Unlock()
- }
- }
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
+ eb, id := events.Bus()
+ ch := make(chan events.Event, 10)
+ err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -455,11 +450,10 @@ func Test_Broken(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- mu.Lock()
- if strings.ContainsAny(data, "undefined_function()") == false {
+ msg := <-ch
+ if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
t.Fail()
}
- mu.Unlock()
assert.Error(t, w.Stop())
}