diff options
Diffstat (limited to 'transport')
-rw-r--r-- | transport/interface.go | 5 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory.go | 9 | ||||
-rw-r--r-- | transport/pipe/pipe_factory_spawn_test.go | 46 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 44 | ||||
-rwxr-xr-x | transport/socket/socket_factory.go | 9 | ||||
-rw-r--r-- | transport/socket/socket_factory_spawn_test.go | 89 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 92 |
7 files changed, 133 insertions, 161 deletions
diff --git a/transport/interface.go b/transport/interface.go index e20f2b0b..0d6c8e8b 100644 --- a/transport/interface.go +++ b/transport/interface.go @@ -4,7 +4,6 @@ import ( "context" "os/exec" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/worker" ) @@ -12,10 +11,10 @@ import ( type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + SpawnWorker(*exec.Cmd) (*worker.Process, error) // Close the factory and underlying connections. Close() error } diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go index 3ea8fd98..c70b3f65 100755 --- a/transport/pipe/pipe_factory.go +++ b/transport/pipe/pipe_factory.go @@ -5,7 +5,6 @@ import ( "os/exec" "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" ) @@ -27,10 +26,10 @@ type sr struct { // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { spCh := make(chan sr) go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case spCh <- sr{ @@ -130,8 +129,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go index 45b7aef8..81004027 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -12,6 +12,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState2(t *testing.T) { @@ -105,21 +106,20 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid2(t *testing.T) { @@ -368,17 +368,13 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorker(cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -390,11 +386,11 @@ func Test_Broken2(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go index b4ba8c87..8c6d440a 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/transport/pipe/pipe_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_GetState(t *testing.T) { @@ -125,22 +126,20 @@ func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Pipe_Invalid(t *testing.T) { @@ -433,17 +432,13 @@ func Test_Broken(t *testing.T) { t.Parallel() ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - data := "" - mu := &sync.Mutex{} - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - mu.Lock() - data = string(wev.Payload.([]byte)) - mu.Unlock() - } - } - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -455,11 +450,10 @@ func Test_Broken(t *testing.T) { assert.Nil(t, res) time.Sleep(time.Second * 3) - mu.Lock() - if strings.ContainsAny(data, "undefined_function()") == false { + msg := <-ch + if strings.ContainsAny(msg.Message(), "undefined_function()") == false { t.Fail() } - mu.Unlock() assert.Error(t, w.Stop()) } diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go index dfffdf4e..06d7000d 100755 --- a/transport/socket/socket_factory.go +++ b/transport/socket/socket_factory.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" @@ -83,12 +82,12 @@ type socketSpawn struct { } // SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { c := make(chan socketSpawn) go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + w, err := worker.InitBaseWorker(cmd) if err != nil { select { case c <- socketSpawn{ @@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd) if err != nil { return nil, err } diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go index 363a3510..45fb3bd5 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/transport/socket/socket_factory_spawn_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start2(t *testing.T) { @@ -110,21 +111,19 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Invalid2(t *testing.T) { @@ -162,18 +161,12 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -198,7 +191,11 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function() string") + } } func Test_Tcp_Echo2(t *testing.T) { @@ -273,21 +270,19 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout2(t *testing.T) { @@ -331,18 +326,12 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -367,7 +356,11 @@ func Test_Unix_Broken2(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Unix_Echo2(t *testing.T) { diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go index d517d026..11b34999 100755 --- a/transport/socket/socket_factory_test.go +++ b/transport/socket/socket_factory_test.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/roadrunner/v2/payload" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func Test_Tcp_Start(t *testing.T) { @@ -124,21 +125,19 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Tcp_Timeout(t *testing.T) { @@ -203,18 +202,12 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -239,7 +232,11 @@ func Test_Tcp_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) wg.Wait() - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } } func Test_Tcp_Echo(t *testing.T) { @@ -368,21 +365,19 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - finish := make(chan struct{}, 10) - listener := func(event interface{}) { - if ev, ok := event.(events.WorkerEvent); ok { - if ev.Event == events.EventWorkerStderr { - if strings.Contains(string(ev.Payload.([]byte)), "failboot") { - finish <- struct{}{} - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) - <-finish + + ev := <-ch + if !strings.Contains(ev.Message(), "failboot") { + t.Fatal("should contain failboot string") + } } func Test_Unix_Timeout(t *testing.T) { @@ -444,20 +439,12 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - block := make(chan struct{}, 10) - listener := func(event interface{}) { - if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerStderr { - e := string(wev.Payload.([]byte)) - if strings.ContainsAny(e, "undefined_function()") { - block <- struct{}{} - return - } - } - } - } + eb, id := events.Bus() + ch := make(chan events.Event, 10) + err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch) + require.NoError(t, err) - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -481,7 +468,12 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res) - <-block + + ev := <-ch + if !strings.Contains(ev.Message(), "undefined_function()") { + t.Fatal("should contain undefined_function string") + } + wg.Wait() } |