diff options
Diffstat (limited to 'pkg/pipe')
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 11 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 15 |
2 files changed, 12 insertions, 14 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index a0e0c258..ecb3fa71 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -6,6 +6,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" workerImpl "github.com/spiral/roadrunner/v2/pkg/worker" @@ -18,8 +19,6 @@ type Factory struct{} // NewPipeFactory returns new factory instance and starts // listening - -// todo: review tests func NewPipeFactory() worker.Factory { return &Factory{} } @@ -31,11 +30,11 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { - w, err := workerImpl.InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { c <- SpawnResult{ w: nil, @@ -114,9 +113,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { const op = errors.Op("spawn worker") - w, err := workerImpl.InitBaseWorker(cmd) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { return nil, errors.E(op, err) } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 0d548b7a..dca09375 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -405,21 +405,20 @@ func Test_Echo_Slow(t *testing.T) { func Test_Broken(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - data := "" mu := &sync.Mutex{} - w.AddListener(func(event interface{}) { + listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { mu.Lock() data = string(wev.Payload.([]byte)) mu.Unlock() } - }) + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } syncWorker, err := workerImpl.From(w) if err != nil { |