From 215c7c91937bf65704db18a59a327a3c64e43530 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 12 Jan 2022 00:19:48 +0300 Subject: pass logger from the factory Signed-off-by: Valery Piashchynski --- Makefile | 8 +- go.sum | 2 - ipc/interface.go | 20 + ipc/pipe/pipe_factory.go | 178 ++++++++ ipc/pipe/pipe_factory_spawn_test.go | 430 ++++++++++++++++++ ipc/pipe/pipe_factory_test.go | 511 +++++++++++++++++++++ ipc/socket/socket_factory.go | 252 +++++++++++ ipc/socket/socket_factory_spawn_test.go | 485 ++++++++++++++++++++ ipc/socket/socket_factory_test.go | 609 ++++++++++++++++++++++++++ pool/static_pool.go | 8 +- pool/static_pool_test.go | 60 +-- pool/supervisor_test.go | 22 +- transport/interface.go | 20 - transport/pipe/pipe_factory.go | 173 -------- transport/pipe/pipe_factory_spawn_test.go | 427 ------------------ transport/pipe/pipe_factory_test.go | 511 --------------------- transport/socket/socket_factory.go | 248 ----------- transport/socket/socket_factory_spawn_test.go | 482 -------------------- transport/socket/socket_factory_test.go | 609 -------------------------- worker/worker.go | 2 +- 20 files changed, 2536 insertions(+), 2521 deletions(-) create mode 100644 ipc/interface.go create mode 100755 ipc/pipe/pipe_factory.go create mode 100644 ipc/pipe/pipe_factory_spawn_test.go create mode 100755 ipc/pipe/pipe_factory_test.go create mode 100755 ipc/socket/socket_factory.go create mode 100644 ipc/socket/socket_factory_spawn_test.go create mode 100755 ipc/socket/socket_factory_test.go delete mode 100644 transport/interface.go delete mode 100755 transport/pipe/pipe_factory.go delete mode 100644 transport/pipe/pipe_factory_spawn_test.go delete mode 100755 transport/pipe/pipe_factory_test.go delete mode 100755 transport/socket/socket_factory.go delete mode 100644 transport/socket/socket_factory_spawn_test.go delete mode 100755 transport/socket/socket_factory_test.go diff --git a/Makefile b/Makefile index d2f4ebc6..67199bb0 100644 --- a/Makefile +++ b/Makefile @@ -7,8 +7,8 @@ SHELL = /bin/sh test_coverage: rm -rf coverage-ci mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./transport/pipe - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./transport/socket + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.out -covermode=atomic ./pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst @@ -19,8 +19,8 @@ test_coverage: tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt test: ## Run application tests - go test -v -race -tags=debug ./transport/pipe - go test -v -race -tags=debug ./transport/socket + go test -v -race -tags=debug ./ipc/pipe + go test -v -race -tags=debug ./ipc/socket go test -v -race -tags=debug ./pool go test -v -race -tags=debug ./worker go test -v -race -tags=debug ./worker_watcher diff --git a/go.sum b/go.sum index 4e748829..744796fc 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,6 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed h1:d5glpD+GMms2DMbu1doSYibjbKasYNvnhq885nOnRz8= -golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/ipc/interface.go b/ipc/interface.go new file mode 100644 index 00000000..1d70cd21 --- /dev/null +++ b/ipc/interface.go @@ -0,0 +1,20 @@ +package ipc + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/worker" +) + +// Factory is responsible for wrapping given command into tasks WorkerProcess. +type Factory interface { + // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. + // Process must not be started. + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) + // SpawnWorker creates new WorkerProcess process based on given command. + // Process must not be started. + SpawnWorker(*exec.Cmd) (*worker.Process, error) + // Close the factory and underlying connections. + Close() error +} diff --git a/ipc/pipe/pipe_factory.go b/ipc/pipe/pipe_factory.go new file mode 100755 index 00000000..4a3c9a67 --- /dev/null +++ b/ipc/pipe/pipe_factory.go @@ -0,0 +1,178 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct { + log *zap.Logger +} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory(log *zap.Logger) *Factory { + return &Factory{ + log: log, + } +} + +type sr struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { + spCh := make(chan sr) + go func() { + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + in, err := cmd.StdoutPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + out, err := cmd.StdinPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + // used as a ping + _, err = internal.Pid(relay) + if err != nil { + _ = w.Kill() + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + _ = w.Kill() + return + } + } + + select { + case + // return worker + spCh <- sr{ + w: w, + err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-spCh: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + return nil, err + } + + in, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + out, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, err + } + + // errors bundle + _, err = internal.Pid(relay) + if err != nil { + _ = w.Kill() + return nil, err + } + + // everything ok, set ready state + w.State().Set(worker.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} diff --git a/ipc/pipe/pipe_factory_spawn_test.go b/ipc/pipe/pipe_factory_spawn_test.go new file mode 100644 index 00000000..2ce5a257 --- /dev/null +++ b/ipc/pipe/pipe_factory_spawn_test.go @@ -0,0 +1,430 @@ +package pipe + +import ( + "os/exec" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +var log = zap.NewNop() + +func Test_GetState2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + assert.NoError(t, w.Stop()) +} + +func Test_Kill2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError3(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError4(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot2(t *testing.T) { + cmd := exec.Command("php", "../../tests/failboot.php") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) +} + +func Test_Pipe_Invalid2(t *testing.T) { + cmd := exec.Command("php", "../../tests/invalid.php") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.NoError(t, err) + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + + assert.Equal(t, "hello", res.String()) + err = w.Stop() + assert.NoError(t, err) +} + +func Test_Pipe_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + assert.NoError(t, err) + require.NotNil(t, w) + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second) + err = w.Stop() + assert.Error(t, err) +} + +func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { + f := NewPipeFactory(log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorker(cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, err := NewPipeFactory(log).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "num_execs: 0") +} + +func Test_Echo_Slow2(t *testing.T) { + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + + w, err := NewPipeFactory(log).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + assert.Error(t, w.Stop()) +} + +func Test_Error2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs2(t *testing.T) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/ipc/pipe/pipe_factory_test.go b/ipc/pipe/pipe_factory_test.go new file mode 100755 index 00000000..025498b5 --- /dev/null +++ b/ipc/pipe/pipe_factory_test.go @@ -0,0 +1,511 @@ +package pipe + +import ( + "context" + "os/exec" + "sync" + "testing" + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_GetState(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + assert.Equal(t, worker.StateStopped, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Kill(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + assert.Error(t, w.Wait()) + assert.Equal(t, worker.StateErrored, w.State().Value()) + }() + + assert.NoError(t, err) + assert.NotNil(t, w) + + assert.Equal(t, worker.StateReady, w.State().Value()) + err = w.Kill() + if err != nil { + t.Errorf("error killing the Process: error %v", err) + } + wg.Wait() +} + +func Test_Pipe_Start(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + assert.NoError(t, w.Stop()) +} + +func Test_Pipe_StartError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err := cmd.Start() + if err != nil { + t.Errorf("error running the command: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_PipeError2(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + // error cause + _, err := cmd.StdinPipe() + if err != nil { + t.Errorf("error creating the STDIN pipe: error %v", err) + } + + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Failboot(t *testing.T) { + cmd := exec.Command("php", "../../tests/failboot.php") + ctx := context.Background() + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + + assert.Nil(t, w) + assert.Error(t, err) +} + +func Test_Pipe_Invalid(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/invalid.php") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Pipe_Echo(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Echo_Script(t *testing.T) { + t.Parallel() + cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + go func() { + if w.Wait() != nil { + t.Fail() + } + }() + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pipe_Broken(t *testing.T) { + t.Parallel() + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + require.NoError(t, err) + require.NotNil(t, w) + + go func() { + errW := w.Wait() + require.Error(t, errW) + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) +} + +func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { + f := NewPipeFactory(log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) + go func() { + if w.Wait() != nil { + b.Fail() + } + }() + + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(context.Background(), cmd) + sw := worker.From(w) + + b.ReportAllocs() + b.ResetTimer() + go func() { + err := w.Wait() + if err != nil { + b.Errorf("error waiting the worker: error %v", err) + } + }() + defer func() { + err := w.Stop() + if err != nil { + b.Errorf("error stopping the worker: error %v", err) + } + }() + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Test_Echo(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err = sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_BadPayload(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + + sw := worker.From(w) + + go func() { + assert.NoError(t, sw.Wait()) + }() + defer func() { + err := sw.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + res, err := sw.Exec(&payload.Payload{}) + + assert.Error(t, err) + assert.Nil(t, res) + + assert.Contains(t, err.Error(), "payload can not be empty") +} + +func Test_String(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") + assert.Contains(t, w.String(), "ready") + assert.Contains(t, w.String(), "num_execs: 0") +} + +func Test_Echo_Slow(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Nil(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Broken(t *testing.T) { + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") + + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second * 3) + assert.Error(t, w.Stop()) +} + +func Test_Error(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.NotNil(t, err) + assert.Nil(t, res) + + if errors.Is(errors.SoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") + } + assert.Contains(t, err.Error(), "hello") +} + +func Test_NumExecs(t *testing.T) { + t.Parallel() + ctx := context.Background() + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err := w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(1), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(2), w.State().NumExecs()) + + _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) + if err != nil { + t.Errorf("fail to execute payload: error %v", err) + } + assert.Equal(t, uint64(3), w.State().NumExecs()) +} diff --git a/ipc/socket/socket_factory.go b/ipc/socket/socket_factory.go new file mode 100755 index 00000000..aa356424 --- /dev/null +++ b/ipc/socket/socket_factory.go @@ -0,0 +1,252 @@ +package socket + +import ( + "context" + "fmt" + "net" + "os/exec" + "sync" + "time" + + "github.com/shirou/gopsutil/process" + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/goridge/v3/pkg/socket" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" + + "golang.org/x/sync/errgroup" +) + +// Factory connects to external stack using socket server. +type Factory struct { + // listens for incoming connections from underlying processes + ls net.Listener + + // relay connection timeout + tout time.Duration + + // sockets which are waiting for process association + relays sync.Map + + log *zap.Logger +} + +// NewSocketServer returns Factory attached to a given socket listener. +// tout specifies for how long factory should serve for incoming relay connection +func NewSocketServer(ls net.Listener, tout time.Duration, log *zap.Logger) *Factory { + f := &Factory{ + ls: ls, + tout: tout, + relays: sync.Map{}, + log: log, + } + + // Be careful + // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go + // https://github.com/golang/go/issues/5045 + go func() { + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } + }() + + return f +} + +// blocking operation, returns an error +func (f *Factory) listen() error { + errGr := &errgroup.Group{} + errGr.Go(func() error { + for { + conn, err := f.ls.Accept() + if err != nil { + return err + } + + rl := socket.NewSocketRelay(conn) + pid, err := internal.Pid(rl) + if err != nil { + return err + } + + f.attachRelayToPid(pid, rl) + } + }) + + return errGr.Wait() +} + +type socketSpawn struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { + c := make(chan socketSpawn) + go func() { + ctxT, cancel := context.WithTimeout(ctx, f.tout) + defer cancel() + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + select { + case c <- socketSpawn{ + w: nil, + err: err, + }: + return + default: + return + } + } + + err = w.Start() + if err != nil { + select { + case c <- socketSpawn{ + w: nil, + err: err, + }: + return + default: + return + } + } + + rl, err := f.findRelayWithContext(ctxT, w) + if err != nil { + _ = w.Kill() + select { + // try to write result + case c <- socketSpawn{ + w: nil, + err: err, + }: + return + // if no receivers - return + default: + return + } + } + + w.AttachRelay(rl) + w.State().Set(worker.StateReady) + + select { + case c <- socketSpawn{ + w: w, + err: nil, + }: + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + return nil, err + } + + err = w.Start() + if err != nil { + return nil, err + } + + rl, err := f.findRelay(w) + if err != nil { + _ = w.Kill() + return nil, err + } + + w.AttachRelay(rl) + + // errors bundle + _, err = internal.Pid(rl) + if err != nil { + _ = w.Kill() + return nil, err + } + + w.State().Set(worker.StateReady) + + return w, nil +} + +// Close socket factory and underlying socket connection. +func (f *Factory) Close() error { + return f.ls.Close() +} + +// waits for Process to connect over socket and returns associated relay of timeout +func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { + ticker := time.NewTicker(time.Millisecond * 10) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-ticker.C: + // check for the process exists + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } + default: + // find first pid and attach relay to it + var r *socket.Relay + f.relays.Range(func(k, val interface{}) bool { + r = val.(*socket.Relay) + f.relays.Delete(k) + return false + }) + + // no relay exists + if r == nil { + continue + } + + return r, nil + } + } +} + +func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { + const op = errors.Op("factory_find_relay") + // poll every 1ms for the relay + pollDone := time.NewTimer(f.tout) + for { + select { + case <-pollDone.C: + return nil, errors.E(op, errors.Str("relay timeout")) + default: + tmp, ok := f.relays.Load(w.Pid()) + if !ok { + continue + } + return tmp.(*socket.Relay), nil + } + } +} + +// chan to store relay associated with specific pid +func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { + f.relays.Store(pid, relay) +} diff --git a/ipc/socket/socket_factory_spawn_test.go b/ipc/socket/socket_factory_spawn_test.go new file mode 100644 index 00000000..36c6cce2 --- /dev/null +++ b/ipc/socket/socket_factory_spawn_test.go @@ -0,0 +1,485 @@ +package socket + +import ( + "net" + "os/exec" + "sync" + "syscall" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +var log = zap.NewNop() + +func Test_Tcp_Start2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + f := NewSocketServer(ls, time.Minute, log) + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + require.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err2) +} + +func Test_Tcp_Invalid2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + // but process exited + assert.NoError(t, err2) +} + +func Test_Tcp_Echo2(t *testing.T) { + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) +} + +func Test_Unix_Timeout2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorker(cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "relay timeout") +} + +func Test_Unix_Invalid2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorker(cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + errC := ls.Close() + assert.NoError(t, errC) + }() + + cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) +} + +func Test_Unix_Echo2(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(t, err) + defer func() { + err = ls.Close() + assert.NoError(t, err) + }() + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err = ls.Close() + assert.NoError(b, err) + }() + + f := NewSocketServer(ls, time.Minute, log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { + ls, err := net.Listen("unix", "sock.unix") + assert.NoError(b, err) + defer func() { + err = ls.Close() + assert.NoError(b, err) + }() + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute, log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { + defer func() { + _ = syscall.Unlink("sock.unix") + }() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/ipc/socket/socket_factory_test.go b/ipc/socket/socket_factory_test.go new file mode 100755 index 00000000..9b48d233 --- /dev/null +++ b/ipc/socket/socket_factory_test.go @@ -0,0 +1,609 @@ +package socket + +import ( + "context" + "net" + "os/exec" + "sync" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_Tcp_Start(t *testing.T) { + ctx := context.Background() + time.Sleep(time.Millisecond * 10) // to ensure free socket + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartCloseFactory(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + f := NewSocketServer(ls, time.Minute, log) + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + require.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Tcp_StartError(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") + err = cmd.Start() + if err != nil { + t.Errorf("error executing the command: error %v", err) + } + + serv := NewSocketServer(ls, time.Minute, log) + time.Sleep(time.Second * 2) + w, err := serv.SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) +} + +func Test_Tcp_Timeout(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*1, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Tcp_Invalid(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Tcp_Broken(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") + + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + assert.Error(t, err) + assert.Nil(t, res) + wg.Wait() + + time.Sleep(time.Second) + err2 := w.Stop() + // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection + // but process is stopped + assert.NoError(t, err2) +} + +func Test_Tcp_Echo(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Tcp_Echo_Script(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if assert.NoError(t, err) { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("sh", "../../tests/socket_test_script.sh") + + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Unix_Start(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.NoError(t, err) + assert.NotNil(t, w) + + go func() { + assert.NoError(t, w.Wait()) + }() + + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } +} + +func Test_Unix_Failboot(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/failboot.php") + + w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) +} + +func Test_Unix_Timeout(t *testing.T) { + ls, err := net.Listen("unix", "sock.unix") + ctx := context.Background() + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") + + w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} + +func Test_Unix_Invalid(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/invalid.php") + + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd) + assert.Error(t, err) + assert.Nil(t, w) +} + +func Test_Unix_Broken(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + errC := ls.Close() + if errC != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + wg := &sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + errW := w.Wait() + assert.Error(t, errW) + }() + + sw := worker.From(w) + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.Error(t, err) + assert.Nil(t, res) + + time.Sleep(time.Second) + err = w.Stop() + assert.NoError(t, err) + + wg.Wait() +} + +func Test_Unix_Echo(t *testing.T) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + t.Errorf("error closing the listener: error %v", err) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + t.Fatal(err) + } + go func() { + assert.NoError(t, w.Wait()) + }() + defer func() { + err = w.Stop() + if err != nil { + t.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Empty(t, res.Context) + + assert.Equal(t, "hello", res.String()) +} + +func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute, log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + go func() { + assert.NoError(b, w.Wait()) + }() + + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("tcp", "127.0.0.1:9007") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} + +func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + f := NewSocketServer(ls, time.Minute, log) + for n := 0; n < b.N; n++ { + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + } +} + +func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { + ctx := context.Background() + ls, err := net.Listen("unix", "sock.unix") + if err == nil { + defer func() { + err = ls.Close() + if err != nil { + b.Errorf("error closing the listener: error %v", err) + } + }() + } else { + b.Skip("socket is busy") + } + + cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") + + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) + if err != nil { + b.Fatal(err) + } + defer func() { + err = w.Stop() + if err != nil { + b.Errorf("error stopping the Process: error %v", err) + } + }() + + sw := worker.From(w) + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +} diff --git a/pool/static_pool.go b/pool/static_pool.go index 019c34b2..dfd9ffd3 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -7,8 +7,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/ipc" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" @@ -36,7 +36,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory transport.Factory + factory ipc.Factory // manages worker states and TTLs ww Watcher @@ -49,7 +49,7 @@ type StaticPool struct { } // NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { +func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -303,7 +303,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 4f98ca91..5db2bd86 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -2,7 +2,7 @@ package pool import ( "context" - "log" + l "log" "os/exec" "runtime" "strconv" @@ -11,8 +11,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" @@ -26,12 +26,14 @@ var cfg = &Config{ DestroyTimeout: time.Second * 500, } +var log = zap.NewNop() + func Test_NewPool(t *testing.T) { ctx := context.Background() p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -46,7 +48,7 @@ func Test_NewPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -73,7 +75,7 @@ func Test_StaticPool_Invalid(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) @@ -85,7 +87,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -102,7 +104,7 @@ func Test_StaticPool_Echo(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -126,7 +128,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -150,7 +152,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -174,7 +176,7 @@ func Test_StaticPool_JobError(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -203,7 +205,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, WithLogger(z), ) @@ -230,7 +232,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg2, ) assert.NoError(t, err) @@ -268,7 +270,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, @@ -287,7 +289,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -326,7 +328,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: true, AllocateTimeout: time.Second, @@ -368,7 +370,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -409,7 +411,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -431,7 +433,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -461,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, @@ -486,7 +488,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -507,7 +509,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx, // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: false, NumWorkers: 1, @@ -538,7 +540,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -555,7 +557,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -572,7 +574,7 @@ func Test_CRC_WithPayload(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.Error(t, err) @@ -604,7 +606,7 @@ func Benchmark_Pool_Echo(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) if err != nil { @@ -636,7 +638,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, @@ -664,7 +666,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { defer wg.Done() if _, err := p.Exec(pld); err != nil { b.Fail() - log.Println(err) + l.Println(err) } }() } @@ -678,7 +680,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -694,7 +696,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { for n := 0; n < b.N; n++ { if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() - log.Println(err) + l.Println(err) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 6ff62316..a479671f 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,7 +32,7 @@ func TestSupervisedPool_Exec(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -62,7 +62,7 @@ func Test_SupervisedPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) assert.NoError(t, err) @@ -93,7 +93,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -131,7 +131,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -166,7 +166,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -223,7 +223,7 @@ func TestSupervisedPool_Idle(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -273,7 +273,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -321,7 +321,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -367,7 +367,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -402,7 +402,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) diff --git a/transport/interface.go b/transport/interface.go deleted file mode 100644 index 0d6c8e8b..00000000 --- a/transport/interface.go +++ /dev/null @@ -1,20 +0,0 @@ -package transport - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/worker" -) - -// Factory is responsible for wrapping given command into tasks WorkerProcess. -type Factory interface { - // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. - // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (*worker.Process, error) - // SpawnWorker creates new WorkerProcess process based on given command. - // Process must not be started. - SpawnWorker(*exec.Cmd) (*worker.Process, error) - // Close the factory and underlying connections. - Close() error -} diff --git a/transport/pipe/pipe_factory.go b/transport/pipe/pipe_factory.go deleted file mode 100755 index c70b3f65..00000000 --- a/transport/pipe/pipe_factory.go +++ /dev/null @@ -1,173 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - - "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/worker" -) - -// Factory connects to stack using standard -// streams (STDIN, STDOUT pipes). -type Factory struct{} - -// NewPipeFactory returns new factory instance and starts -// listening -func NewPipeFactory() *Factory { - return &Factory{} -} - -type sr struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, -// method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { - spCh := make(chan sr) - go func() { - w, err := worker.InitBaseWorker(cmd) - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - in, err := cmd.StdoutPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - out, err := cmd.StdinPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - // used as a ping - _, err = internal.Pid(relay) - if err != nil { - _ = w.Kill() - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - _ = w.Kill() - return - } - } - - select { - case - // return worker - spCh <- sr{ - w: w, - err: nil, - }: - // everything ok, set ready state - w.State().Set(worker.StateReady) - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-spCh: - if res.err != nil { - return nil, res.err - } - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd) - if err != nil { - return nil, err - } - - in, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - - out, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - return nil, err - } - - // errors bundle - _, err = internal.Pid(relay) - if err != nil { - _ = w.Kill() - return nil, err - } - - // everything ok, set ready state - w.State().Set(worker.StateReady) - return w, nil -} - -// Close the factory. -func (f *Factory) Close() error { - return nil -} diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go deleted file mode 100644 index 9aa12564..00000000 --- a/transport/pipe/pipe_factory_spawn_test.go +++ /dev/null @@ -1,427 +0,0 @@ -package pipe - -import ( - "os/exec" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_GetState2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, worker.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - assert.NoError(t, w.Stop()) -} - -func Test_Kill2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, worker.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError3(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError4(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot2(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) -} - -func Test_Pipe_Invalid2(t *testing.T) { - cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.NoError(t, err) - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - go func() { - if w.Wait() != nil { - t.Fail() - } - }() - - assert.Equal(t, "hello", res.String()) - err = w.Stop() - assert.NoError(t, err) -} - -func Test_Pipe_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - assert.NoError(t, err) - require.NotNil(t, w) - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second) - err = w.Stop() - assert.Error(t, err) -} - -func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorker(cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - sw := worker.From(w) - - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err = sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err := sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{}) - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "num_execs: 0") -} - -func Test_Echo_Slow2(t *testing.T) { - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - - w, err := NewPipeFactory().SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second * 3) - assert.Error(t, w.Stop()) -} - -func Test_Error2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs2(t *testing.T) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(1), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(2), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(3), w.State().NumExecs()) -} diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go deleted file mode 100755 index cbf1431a..00000000 --- a/transport/pipe/pipe_factory_test.go +++ /dev/null @@ -1,511 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - "sync" - "testing" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_GetState(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - assert.Equal(t, worker.StateStopped, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Kill(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - assert.Error(t, w.Wait()) - assert.Equal(t, worker.StateErrored, w.State().Value()) - }() - - assert.NoError(t, err) - assert.NotNil(t, w) - - assert.Equal(t, worker.StateReady, w.State().Value()) - err = w.Kill() - if err != nil { - t.Errorf("error killing the Process: error %v", err) - } - wg.Wait() -} - -func Test_Pipe_Start(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - assert.NoError(t, w.Stop()) -} - -func Test_Pipe_StartError(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err := cmd.Start() - if err != nil { - t.Errorf("error running the command: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_PipeError2(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - // error cause - _, err := cmd.StdinPipe() - if err != nil { - t.Errorf("error creating the STDIN pipe: error %v", err) - } - - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Failboot(t *testing.T) { - cmd := exec.Command("php", "../../tests/failboot.php") - ctx := context.Background() - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - - assert.Nil(t, w) - assert.Error(t, err) -} - -func Test_Pipe_Invalid(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/invalid.php") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Pipe_Echo(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - go func() { - if w.Wait() != nil { - t.Fail() - } - }() - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Echo_Script(t *testing.T) { - t.Parallel() - cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - go func() { - if w.Wait() != nil { - t.Fail() - } - }() - - assert.Equal(t, "hello", res.String()) -} - -func Test_Pipe_Broken(t *testing.T) { - t.Parallel() - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - require.NoError(t, err) - require.NotNil(t, w) - - go func() { - errW := w.Wait() - require.Error(t, errW) - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second) - err = w.Stop() - assert.NoError(t, err) -} - -func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { - f := NewPipeFactory() - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) - go func() { - if w.Wait() != nil { - b.Fail() - } - }() - - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) - sw := worker.From(w) - - b.ReportAllocs() - b.ResetTimer() - go func() { - err := w.Wait() - if err != nil { - b.Errorf("error waiting the worker: error %v", err) - } - }() - defer func() { - err := w.Stop() - if err != nil { - b.Errorf("error stopping the worker: error %v", err) - } - }() - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Test_Echo(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err = sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_BadPayload(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - - sw := worker.From(w) - - go func() { - assert.NoError(t, sw.Wait()) - }() - defer func() { - err := sw.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - res, err := sw.Exec(&payload.Payload{}) - - assert.Error(t, err) - assert.Nil(t, res) - - assert.Contains(t, err.Error(), "payload can not be empty") -} - -func Test_String(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - assert.Contains(t, w.String(), "php ../../tests/client.php echo pipes") - assert.Contains(t, w.String(), "ready") - assert.Contains(t, w.String(), "num_execs: 0") -} - -func Test_Echo_Slow(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Nil(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Broken(t *testing.T) { - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second * 3) - assert.Error(t, w.Stop()) -} - -func Test_Error(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.NotNil(t, err) - assert.Nil(t, res) - - if errors.Is(errors.SoftJob, err) == false { - t.Fatal("error should be of type errors.ErrSoftJob") - } - assert.Contains(t, err.Error(), "hello") -} - -func Test_NumExecs(t *testing.T) { - t.Parallel() - ctx := context.Background() - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err := w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(1), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(2), w.State().NumExecs()) - - _, err = sw.Exec(&payload.Payload{Body: []byte("hello")}) - if err != nil { - t.Errorf("fail to execute payload: error %v", err) - } - assert.Equal(t, uint64(3), w.State().NumExecs()) -} diff --git a/transport/socket/socket_factory.go b/transport/socket/socket_factory.go deleted file mode 100755 index 06d7000d..00000000 --- a/transport/socket/socket_factory.go +++ /dev/null @@ -1,248 +0,0 @@ -package socket - -import ( - "context" - "fmt" - "net" - "os/exec" - "sync" - "time" - - "github.com/shirou/gopsutil/process" - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/goridge/v3/pkg/socket" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/worker" - - "golang.org/x/sync/errgroup" -) - -// Factory connects to external stack using socket server. -type Factory struct { - // listens for incoming connections from underlying processes - ls net.Listener - - // relay connection timeout - tout time.Duration - - // sockets which are waiting for process association - relays sync.Map -} - -// NewSocketServer returns Factory attached to a given socket listener. -// tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { - f := &Factory{ - ls: ls, - tout: tout, - relays: sync.Map{}, - } - - // Be careful - // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go - // https://github.com/golang/go/issues/5045 - go func() { - err := f.listen() - // there is no logger here, use fmt - if err != nil { - fmt.Printf("[WARN]: socket server listen, error: %v\n", err) - } - }() - - return f -} - -// blocking operation, returns an error -func (f *Factory) listen() error { - errGr := &errgroup.Group{} - errGr.Go(func() error { - for { - conn, err := f.ls.Accept() - if err != nil { - return err - } - - rl := socket.NewSocketRelay(conn) - pid, err := internal.Pid(rl) - if err != nil { - return err - } - - f.attachRelayToPid(pid, rl) - } - }) - - return errGr.Wait() -} - -type socketSpawn struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates Process and connects it to appropriate relay or return an error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { - c := make(chan socketSpawn) - go func() { - ctxT, cancel := context.WithTimeout(ctx, f.tout) - defer cancel() - w, err := worker.InitBaseWorker(cmd) - if err != nil { - select { - case c <- socketSpawn{ - w: nil, - err: err, - }: - return - default: - return - } - } - - err = w.Start() - if err != nil { - select { - case c <- socketSpawn{ - w: nil, - err: err, - }: - return - default: - return - } - } - - rl, err := f.findRelayWithContext(ctxT, w) - if err != nil { - _ = w.Kill() - select { - // try to write result - case c <- socketSpawn{ - w: nil, - err: err, - }: - return - // if no receivers - return - default: - return - } - } - - w.AttachRelay(rl) - w.State().Set(worker.StateReady) - - select { - case c <- socketSpawn{ - w: w, - err: nil, - }: - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-c: - if res.err != nil { - return nil, res.err - } - - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd) - if err != nil { - return nil, err - } - - err = w.Start() - if err != nil { - return nil, err - } - - rl, err := f.findRelay(w) - if err != nil { - _ = w.Kill() - return nil, err - } - - w.AttachRelay(rl) - - // errors bundle - _, err = internal.Pid(rl) - if err != nil { - _ = w.Kill() - return nil, err - } - - w.State().Set(worker.StateReady) - - return w, nil -} - -// Close socket factory and underlying socket connection. -func (f *Factory) Close() error { - return f.ls.Close() -} - -// waits for Process to connect over socket and returns associated relay of timeout -func (f *Factory) findRelayWithContext(ctx context.Context, w worker.BaseProcess) (*socket.Relay, error) { - ticker := time.NewTicker(time.Millisecond * 10) - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-ticker.C: - // check for the process exists - _, err := process.NewProcess(int32(w.Pid())) - if err != nil { - return nil, err - } - default: - // find first pid and attach relay to it - var r *socket.Relay - f.relays.Range(func(k, val interface{}) bool { - r = val.(*socket.Relay) - f.relays.Delete(k) - return false - }) - - // no relay exists - if r == nil { - continue - } - - return r, nil - } - } -} - -func (f *Factory) findRelay(w worker.BaseProcess) (*socket.Relay, error) { - const op = errors.Op("factory_find_relay") - // poll every 1ms for the relay - pollDone := time.NewTimer(f.tout) - for { - select { - case <-pollDone.C: - return nil, errors.E(op, errors.Str("relay timeout")) - default: - tmp, ok := f.relays.Load(w.Pid()) - if !ok { - continue - } - return tmp.(*socket.Relay), nil - } - } -} - -// chan to store relay associated with specific pid -func (f *Factory) attachRelayToPid(pid int64, relay relay.Relay) { - f.relays.Store(pid, relay) -} diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go deleted file mode 100644 index 7fc6f4a5..00000000 --- a/transport/socket/socket_factory_spawn_test.go +++ /dev/null @@ -1,482 +0,0 @@ -package socket - -import ( - "net" - "os/exec" - "sync" - "syscall" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_Tcp_Start2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - require.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err2) -} - -func Test_Tcp_Invalid2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - // but process exited - assert.NoError(t, err2) -} - -func Test_Tcp_Echo2(t *testing.T) { - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) -} - -func Test_Unix_Timeout2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "relay timeout") -} - -func Test_Unix_Invalid2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - errC := ls.Close() - assert.NoError(t, errC) - }() - - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - - time.Sleep(time.Second) - err = w.Stop() - assert.NoError(t, err) -} - -func Test_Unix_Echo2(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(t, err) - defer func() { - err = ls.Close() - assert.NoError(t, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err = ls.Close() - assert.NoError(b, err) - }() - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { - ls, err := net.Listen("unix", "sock.unix") - assert.NoError(b, err) - defer func() { - err = ls.Close() - assert.NoError(b, err) - }() - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { - defer func() { - _ = syscall.Unlink("sock.unix") - }() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go deleted file mode 100755 index 5a078be4..00000000 --- a/transport/socket/socket_factory_test.go +++ /dev/null @@ -1,609 +0,0 @@ -package socket - -import ( - "context" - "net" - "os/exec" - "sync" - "testing" - "time" - - "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/worker" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_Tcp_Start(t *testing.T) { - ctx := context.Background() - time.Sleep(time.Millisecond * 10) // to ensure free socket - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartCloseFactory(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - require.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Tcp_StartError(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - err = cmd.Start() - if err != nil { - t.Errorf("error executing the command: error %v", err) - } - - serv := NewSocketServer(ls, time.Minute) - time.Sleep(time.Second * 2) - w, err := serv.SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Failboot(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err3 := ls.Close() - if err3 != nil { - t.Errorf("error closing the listener: error %v", err3) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err2) -} - -func Test_Tcp_Timeout(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Tcp_Invalid(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Tcp_Broken(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - assert.Error(t, err) - assert.Nil(t, res) - wg.Wait() - - time.Sleep(time.Second) - err2 := w.Stop() - // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection - // but process is stopped - assert.NoError(t, err2) -} - -func Test_Tcp_Echo(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Tcp_Echo_Script(t *testing.T) { - time.Sleep(time.Millisecond * 10) // to ensure free socket - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if assert.NoError(t, err) { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("sh", "../../tests/socket_test_script.sh") - - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Test_Unix_Start(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - assert.NoError(t, err) - assert.NotNil(t, w) - - go func() { - assert.NoError(t, w.Wait()) - }() - - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } -} - -func Test_Unix_Failboot(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/failboot.php") - - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) -} - -func Test_Unix_Timeout(t *testing.T) { - ls, err := net.Listen("unix", "sock.unix") - ctx := context.Background() - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) - assert.Nil(t, w) - assert.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") -} - -func Test_Unix_Invalid(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/invalid.php") - - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) - assert.Error(t, err) - assert.Nil(t, w) -} - -func Test_Unix_Broken(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - errC := ls.Close() - if errC != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - errW := w.Wait() - assert.Error(t, errW) - }() - - sw := worker.From(w) - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.Error(t, err) - assert.Nil(t, res) - - time.Sleep(time.Second) - err = w.Stop() - assert.NoError(t, err) - - wg.Wait() -} - -func Test_Unix_Echo(t *testing.T) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - t.Errorf("error closing the listener: error %v", err) - } - }() - } else { - t.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - t.Fatal(err) - } - go func() { - assert.NoError(t, w.Wait()) - }() - defer func() { - err = w.Stop() - if err != nil { - t.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - res, err := sw.Exec(&payload.Payload{Body: []byte("hello")}) - - assert.NoError(t, err) - assert.NotNil(t, res) - assert.NotNil(t, res.Body) - assert.Empty(t, res.Context) - - assert.Equal(t, "hello", res.String()) -} - -func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - go func() { - assert.NoError(b, w.Wait()) - }() - - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("tcp", "127.0.0.1:9007") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} - -func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - f := NewSocketServer(ls, time.Minute) - for n := 0; n < b.N; n++ { - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := f.SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - } -} - -func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { - ctx := context.Background() - ls, err := net.Listen("unix", "sock.unix") - if err == nil { - defer func() { - err = ls.Close() - if err != nil { - b.Errorf("error closing the listener: error %v", err) - } - }() - } else { - b.Skip("socket is busy") - } - - cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) - if err != nil { - b.Fatal(err) - } - defer func() { - err = w.Stop() - if err != nil { - b.Errorf("error stopping the Process: error %v", err) - } - }() - - sw := worker.From(w) - - for n := 0; n < b.N; n++ { - if _, err := sw.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { - b.Fail() - } - } -} diff --git a/worker/worker.go b/worker/worker.go index 8ca55a3b..52bdbacb 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -62,7 +62,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { } if w.log == nil { - z, err := zap.NewDevelopment() + z, err := zap.NewProduction() if err != nil { return nil, err } -- cgit v1.2.3 From e19829b90fa9c2d945cb1871c092c9538dcf4c35 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 12 Jan 2022 00:20:30 +0300 Subject: update deps Signed-off-by: Valery Piashchynski --- go.mod | 4 ++-- go.sum | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 7971cc66..b59509da 100644 --- a/go.mod +++ b/go.mod @@ -26,8 +26,8 @@ require ( github.com/tklauser/numcpus v0.3.0 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/zap v1.19.1 - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + go.uber.org/zap v1.20.0 + golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/go.sum b/go.sum index 744796fc..c23f0fe9 100644 --- a/go.sum +++ b/go.sum @@ -53,11 +53,14 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= +go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc= +go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -78,6 +81,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -- cgit v1.2.3