summaryrefslogtreecommitdiff
path: root/pkg/pipe
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pipe')
-rwxr-xr-xpkg/pipe/pipe_factory.go15
-rwxr-xr-xpkg/pipe/pipe_factory_test.go15
2 files changed, 17 insertions, 13 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index a0e0c258..f6211ab9 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -6,6 +6,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/pipe"
+ "github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
workerImpl "github.com/spiral/roadrunner/v2/pkg/worker"
@@ -14,14 +15,18 @@ import (
// Factory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type Factory struct{}
+type Factory struct {
+ listener []events.EventListener
+}
// NewPipeFactory returns new factory instance and starts
// listening
// todo: review tests
-func NewPipeFactory() worker.Factory {
- return &Factory{}
+func NewPipeFactory(listeners ...events.EventListener) worker.Factory {
+ return &Factory{
+ listener: listeners,
+ }
}
type SpawnResult struct {
@@ -35,7 +40,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...))
if err != nil {
c <- SpawnResult{
w: nil,
@@ -116,7 +121,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo
func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn worker")
- w, err := workerImpl.InitBaseWorker(cmd)
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...))
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 0d548b7a..3cef0646 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -405,21 +405,20 @@ func Test_Echo_Slow(t *testing.T) {
func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
-
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
- if err != nil {
- t.Fatal(err)
- }
-
data := ""
mu := &sync.Mutex{}
- w.AddListener(func(event interface{}) {
+ listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
mu.Lock()
data = string(wev.Payload.([]byte))
mu.Unlock()
}
- })
+ }
+
+ w, err := NewPipeFactory(listener).SpawnWorkerWithTimeout(ctx, cmd)
+ if err != nil {
+ t.Fatal(err)
+ }
syncWorker, err := workerImpl.From(w)
if err != nil {