diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 21:46:50 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-09-16 21:46:50 +0300 |
commit | 3581b45f237a3f7aa29591ceb2bf6f4a4642a2f5 (patch) | |
tree | e723b19ec1ac16b7ccc7b3c2da69d4a416d63d81 /transport/pipe | |
parent | 337d292dd2d6ff0a555098b1970d8194d8df8bc2 (diff) | |
parent | 823d831b57b75f70c7c3bbbee355f2016633bb3b (diff) |
[#803]: feat(plugins): move plugins to a separate repositoryv2.5.0-alpha.2
[#803]: feat(plugins): move plugins to a separate repository
Diffstat (limited to 'transport/pipe')
-rwxr-xr-x | transport/pipe/pipe_factory.go | 197 | ||||
-rw-r--r-- | transport/pipe/pipe_factory_spawn_test.go | 461 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 503 |
3 files changed, 1161 insertions, 0 deletions
diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go new file mode 100755 index 00000000..0d46f496 --- /dev/null +++ b/transport/pipe/pipe_factory.go @@ -0,0 +1,197 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/multierr" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct{} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory() *Factory { + return &Factory{} +} + +type sr struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) + const op = errors.Op("factory_spawn_worker_with_timeout") + go func() { + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + in, err := cmd.StdoutPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + out, err := cmd.StdinPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + pid, err := internal.FetchPID(relay) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return + } + } + + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + + select { + case + // return worker + spCh <- sr{ + w: w, + err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-spCh: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, errors.E(op, err) + } + + in, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.E(op, err) + } + + out, err := cmd.StdinPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + // errors bundle + if pid, err := internal.FetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + + // everything ok, set ready state + w.State().Set(worker.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go new file mode 100644 index 00000000..45b7aef8 --- /dev/null +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -0,0 +1,461 @@ +package pipe + +import ( + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + assert.NoError(t, w.Stop()) +} + +func Test_Kill2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError3(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError4(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot2(t *testing.T) { + cmd := exec.Command("php", "../../tests/failboot.php") + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Pipe_Invalid2(t *testing.T) { + cmd := exec.Command("php", "../../tests/invalid.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow2(t *testing.T) { + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go new file mode 100755 index 00000000..f8198610 --- /dev/null +++ b/transport/pipe/pipe_factory_test.go @@ -0,0 +1,503 @@ +package pipe + +import ( + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + // error cause + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/failboot.php") + ctx := context.Background() + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Pipe_Invalid(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} |