diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
commit | f3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch) | |
tree | 32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /transport | |
parent | 5d2cd55ab522d4f1e65a833f91146444465a32ac (diff) |
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'transport')
-rw-r--r-- | transport/interface.go | 21 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory.go | 197 | ||||
-rw-r--r-- | transport/pipe/pipe_factory_spawn_test.go | 461 | ||||
-rwxr-xr-x | transport/pipe/pipe_factory_test.go | 503 | ||||
-rwxr-xr-x | transport/socket/socket_factory.go | 255 | ||||
-rw-r--r-- | transport/socket/socket_factory_spawn_test.go | 533 | ||||
-rwxr-xr-x | transport/socket/socket_factory_test.go | 622 |
7 files changed, 2592 insertions, 0 deletions
diff --git a/transport/interface.go b/transport/interface.go new file mode 100644 index 00000000..e20f2b0b --- /dev/null +++ b/transport/interface.go @@ -0,0 +1,21 @@ +package transport + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/worker" +) + +// Factory is responsible for wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.Listener) (*worker.Process, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd, ...events.Listener) (*worker.Process, error) + // Close the factory and underlying connections. + Close() error +} diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go new file mode 100755 index 00000000..0d46f496 --- /dev/null +++ b/transport/pipe/pipe_factory.go @@ -0,0 +1,197 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/multierr" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct{} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory() *Factory { + return &Factory{} +} + +type sr struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) + const op = errors.Op("factory_spawn_worker_with_timeout") + go func() { + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + in, err := cmd.StdoutPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + out, err := cmd.StdinPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + pid, err := internal.FetchPID(relay) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return + } + } + + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + + select { + case + // return worker + spCh <- sr{ + w: w, + err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-spCh: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, errors.E(op, err) + } + + in, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.E(op, err) + } + + out, err := cmd.StdinPipe() + if err != nil { + return nil, errors.E(op, err) + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + // errors bundle + if pid, err := internal.FetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + + // everything ok, set ready state + w.State().Set(worker.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go new file mode 100644 index 00000000..20708d45 --- /dev/null +++ b/transport/pipe/pipe_factory_spawn_test.go @@ -0,0 +1,461 @@ +package pipe + +import ( + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + assert.NoError(t, w.Stop()) +} + +func Test_Kill2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError3(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError4(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/failboot.php") + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Pipe_Invalid2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/invalid.php") + w, err := NewPipeFactory().SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs2(t *testing.T) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go new file mode 100755 index 00000000..c999eadd --- /dev/null +++ b/transport/pipe/pipe_factory_test.go @@ -0,0 +1,503 @@ +package pipe + +import ( + "context" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_GetState(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + // error cause + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/failboot.php") + ctx := context.Background() + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Pipe_Invalid(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory() + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "numExecs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "broken", "pipes") + data := "" + mu := &sync.Mutex{} + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + mu.Lock() + data = string(wev.Payload.([]byte)) + mu.Unlock() + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + mu.Lock() + if strings.ContainsAny(data, "undefined_function()") == false { + t.Fail() + } + mu.Unlock() + assert.Error(t, w.Stop()) +} + +func Test_Error(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go new file mode 100755 index 00000000..d98ce607 --- /dev/null +++ b/transport/socket/socket_factory.go @@ -0,0 +1,255 @@ +package socket + +import ( + "context" + "fmt" + "net" + "os/exec" + "sync" + "time" + + "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/goridge/v3/pkg/socket" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + + "go.uber.org/multierr" + "golang.org/x/sync/errgroup" +) + +// Factory connects to external stack using socket server. +type Factory struct { + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // sockets which are waiting for process association + relays sync.Map +} + +// NewSocketServer returns Factory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection +func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { + f := &Factory{ + ls: ls, + tout: tout, + relays: sync.Map{}, + } + + // Be careful + // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go + // https://github.com/golang/go/issues/5045 + go func() { + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } + }() + + return f +} + +// blocking operation, returns an error +func (f *Factory) listen() error { + errGr := &errgroup.Group{} + errGr.Go(func() error { + for { + conn, err := f.ls.Accept() + if err != nil { + return err + } + + rl := socket.NewSocketRelay(conn) + pid, err := internal.FetchPID(rl) + if err != nil { + return err + } + + f.attachRelayToPid(pid, rl) + } + }) + + return errGr.Wait() +} + +type socketSpawn struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker_with_timeout") + c := make(chan socketSpawn) + go func() { + ctxT, cancel := context.WithTimeout(ctx, f.tout) + defer cancel() + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + select { + case c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + err = w.Start() + if err != nil { + select { + case c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + }: + return + default: + return + } + } + + rl, err := f.findRelayWithContext(ctxT, w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + + select { + // try to write result + case c <- socketSpawn{ + w: nil, + err: errors.E(op, err), + }: + return + // if no receivers - return + default: + return + } + } + + w.AttachRelay(rl) + w.State().Set(worker.StateReady) + + select { + case c <- socketSpawn{ + w: w, + err: nil, + }: + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { + const op = errors.Op("factory_spawn_worker") + w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) + if err != nil { + return nil, err + } + + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } + + rl, err := f.findRelay(w) + if err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, err + } + + w.AttachRelay(rl) + + // errors bundle + if pid, err := internal.FetchPID(rl); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + + w.State().Set(worker.StateReady) + + return w, nil +} + +// Close socket factory and underlying socket connection. +func (f *Factory) Close() error { + return f.ls.Close() +} + +// waits for Process to connect over socket and returns associated relay of timeout +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { + ticker := time.NewTicker(time.Millisecond * 10) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + // check for the process exists + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } + default: + tmp, ok := f.relays.LoadAndDelete(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { + const op = errors.Op("factory_find_relay") + // poll every 1ms for the relay + pollDone := time.NewTimer(f.tout) + for { + select { + case <-pollDone.C: + return nil, errors.E(op, errors.Str("relay timeout")) + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +// chan to store relay associated with specific pid +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { + f.relays.Store(pid, relay) +} diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go new file mode 100644 index 00000000..aa5b811c --- /dev/null +++ b/transport/socket/socket_factory_spawn_test.go @@ -0,0 +1,533 @@ +package socket + +import ( + "net" + "os/exec" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + assert.Nil(t, w) + assert.Error(t, err2) + <-finish +} + +func Test_Tcp_Invalid2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + <-finish +} + +func Test_Tcp_Echo2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/failboot.php") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Unix_Timeout2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "relay timeout") +} + +func Test_Unix_Invalid2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + errC := ls.Close() + assert.NoError(t, errC) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + <-finish +} + +func Test_Unix_Echo2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err = ls.Close() + assert.NoError(b, err) + }() + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err = ls.Close() + assert.NoError(b, err) + }() + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go new file mode 100755 index 00000000..36abecf9 --- /dev/null +++ b/transport/socket/socket_factory_test.go @@ -0,0 +1,622 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "strings" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" +) + +func Test_Tcp_Start(t *testing.T) { + ctx := context.Background() + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + f := NewSocketServer(ls, time.Minute) + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + serv := NewSocketServer(ls, time.Minute) + time.Sleep(time.Second * 2) + w, err := serv.SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + assert.Nil(t, w) + assert.Error(t, err2) + <-finish +} + +func Test_Tcp_Timeout(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "tcp", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Tcp_Invalid(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + defer func() { + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + assert.Error(t, err2) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + <-finish +} + +func Test_Tcp_Echo(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/failboot.php") + + finish := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) + assert.Nil(t, w) + assert.Error(t, err) + <-finish +} + +func Test_Unix_Timeout(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Unix_Invalid(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") + + block := make(chan struct{}, 10) + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerStderr { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + defer func() { + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + <-block + wg.Wait() +} + +func Test_Unix_Echo(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} |