summaryrefslogtreecommitdiff
path: root/transport/socket
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-27 22:50:03 +0300
committerGitHub <[email protected]>2021-10-27 22:50:03 +0300
commitc8c3f9f113eae13aa37cf92043b288bb0c68a622 (patch)
tree42f8ab386735d5f8b002907d07249e94b4c10a12 /transport/socket
parent1f62e21020cc3014e9eb2dc33c154de6dd5b22d5 (diff)
parentab591e7f122e28857cef00c905a8125992ea3cdf (diff)
[#838]: feat(events): events package deep refactoringv2.6.0-alpha.1
[#838]: feat(events): events package deep refactoring
Diffstat (limited to 'transport/socket')
-rwxr-xr-xtransport/socket/socket_factory.go9
-rw-r--r--transport/socket/socket_factory_spawn_test.go93
-rwxr-xr-xtransport/socket/socket_factory_test.go96
3 files changed, 95 insertions, 103 deletions
diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go
index dfffdf4e..06d7000d 100755
--- a/transport/socket/socket_factory.go
+++ b/transport/socket/socket_factory.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/goridge/v3/pkg/socket"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
@@ -83,12 +82,12 @@ type socketSpawn struct {
}
// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
c := make(chan socketSpawn)
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
select {
case c <- socketSpawn{
@@ -157,8 +156,8 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
+func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
+ w, err := worker.InitBaseWorker(cmd)
if err != nil {
return nil, err
}
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index 363a3510..2db2fd40 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start2(t *testing.T) {
@@ -110,21 +111,20 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -162,18 +162,13 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -198,7 +193,11 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function() string")
+ }
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -273,21 +272,20 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -331,18 +329,13 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -367,7 +360,11 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index d517d026..7b28a847 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func Test_Tcp_Start(t *testing.T) {
@@ -124,21 +125,20 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -203,18 +203,13 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -239,7 +234,11 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
wg.Wait()
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
}
func Test_Tcp_Echo(t *testing.T) {
@@ -368,21 +367,20 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- finish := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if ev, ok := event.(events.WorkerEvent); ok {
- if ev.Event == events.EventWorkerStderr {
- if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
- finish <- struct{}{}
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
- <-finish
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "failboot") {
+ t.Fatal("should contain failboot string")
+ }
}
func Test_Unix_Timeout(t *testing.T) {
@@ -444,20 +442,13 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- block := make(chan struct{}, 10)
- listener := func(event interface{}) {
- if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerStderr {
- e := string(wev.Payload.([]byte))
- if strings.ContainsAny(e, "undefined_function()") {
- block <- struct{}{}
- return
- }
- }
- }
- }
+ eb, id := events.Bus()
+ defer eb.Unsubscribe(id)
+ ch := make(chan events.Event, 10)
+ err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ require.NoError(t, err)
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -481,7 +472,12 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-block
+
+ ev := <-ch
+ if !strings.Contains(ev.Message(), "undefined_function()") {
+ t.Fatal("should contain undefined_function string")
+ }
+
wg.Wait()
}