diff options
author | Valery Piashchynski <[email protected]> | 2022-01-12 00:28:16 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2022-01-12 00:28:16 +0300 |
commit | a191eac78092dda89dbcd19c7a3a171f6aafb71a (patch) | |
tree | 48db67c0329dc33f63bd9784fbcf50a410434ebe | |
parent | e4ee005938a388de4e4bbb9fad097b563989e158 (diff) | |
parent | 657e5969414419180af49e1440e91c0f271985da (diff) |
[#888]: bug(logger): the RR logger wasn't passed from the poolv2.7.0-rc.2
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 7 | ||||
-rw-r--r-- | ipc/interface.go (renamed from transport/interface.go) | 2 | ||||
-rwxr-xr-x | ipc/pipe/pipe_factory.go (renamed from transport/pipe/pipe_factory.go) | 15 | ||||
-rw-r--r-- | ipc/pipe/pipe_factory_spawn_test.go (renamed from transport/pipe/pipe_factory_spawn_test.go) | 45 | ||||
-rwxr-xr-x | ipc/pipe/pipe_factory_test.go (renamed from transport/pipe/pipe_factory_test.go) | 44 | ||||
-rwxr-xr-x | ipc/socket/socket_factory.go (renamed from transport/socket/socket_factory.go) | 10 | ||||
-rw-r--r-- | ipc/socket/socket_factory_spawn_test.go (renamed from transport/socket/socket_factory_spawn_test.go) | 37 | ||||
-rwxr-xr-x | ipc/socket/socket_factory_test.go (renamed from transport/socket/socket_factory_test.go) | 38 | ||||
-rwxr-xr-x | pool/static_pool.go | 8 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 60 | ||||
-rw-r--r-- | pool/supervisor_test.go | 22 | ||||
-rwxr-xr-x | worker/worker.go | 2 |
14 files changed, 161 insertions, 139 deletions
@@ -7,8 +7,8 @@ SHELL = /bin/sh test_coverage: rm -rf coverage-ci mkdir ./coverage-ci - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./transport/pipe - go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./transport/socket + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.out -covermode=atomic ./pool go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst @@ -19,8 +19,8 @@ test_coverage: tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt test: ## Run application tests - go test -v -race -tags=debug ./transport/pipe - go test -v -race -tags=debug ./transport/socket + go test -v -race -tags=debug ./ipc/pipe + go test -v -race -tags=debug ./ipc/socket go test -v -race -tags=debug ./pool go test -v -race -tags=debug ./worker go test -v -race -tags=debug ./worker_watcher @@ -27,7 +27,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/zap v1.20.0 - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) @@ -51,11 +51,14 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4= +go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec= go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= +go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI= +go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI= go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc= go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -78,6 +81,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY= +golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/transport/interface.go b/ipc/interface.go index 0d6c8e8b..1d70cd21 100644 --- a/transport/interface.go +++ b/ipc/interface.go @@ -1,4 +1,4 @@ -package transport +package ipc import ( "context" diff --git a/transport/pipe/pipe_factory.go b/ipc/pipe/pipe_factory.go index c70b3f65..4a3c9a67 100755 --- a/transport/pipe/pipe_factory.go +++ b/ipc/pipe/pipe_factory.go @@ -7,16 +7,21 @@ import ( "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" ) // Factory connects to stack using standard // streams (STDIN, STDOUT pipes). -type Factory struct{} +type Factory struct { + log *zap.Logger +} // NewPipeFactory returns new factory instance and starts // listening -func NewPipeFactory() *Factory { - return &Factory{} +func NewPipeFactory(log *zap.Logger) *Factory { + return &Factory{ + log: log, + } } type sr struct { @@ -29,7 +34,7 @@ type sr struct { func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { spCh := make(chan sr) go func() { - w, err := worker.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) if err != nil { select { case spCh <- sr{ @@ -130,7 +135,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w } func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) if err != nil { return nil, err } diff --git a/transport/pipe/pipe_factory_spawn_test.go b/ipc/pipe/pipe_factory_spawn_test.go index 9aa12564..2ce5a257 100644 --- a/transport/pipe/pipe_factory_spawn_test.go +++ b/ipc/pipe/pipe_factory_spawn_test.go @@ -11,12 +11,15 @@ import ( "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) +var log = zap.NewNop() + func Test_GetState2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) assert.Equal(t, worker.StateStopped, w.State().Value()) @@ -32,7 +35,7 @@ func Test_GetState2(t *testing.T) { func Test_Kill2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -55,7 +58,7 @@ func Test_Kill2(t *testing.T) { func Test_Pipe_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -73,7 +76,7 @@ func Test_Pipe_StartError2(t *testing.T) { t.Errorf("error running the command: error %v", err) } - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -85,7 +88,7 @@ func Test_Pipe_PipeError3(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -97,28 +100,28 @@ func Test_Pipe_PipeError4(t *testing.T) { t.Errorf("error creating the STDIN pipe: error %v", err) } - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) } func Test_Pipe_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } func Test_Pipe_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.NoError(t, err) sw := worker.From(w) @@ -142,7 +145,7 @@ func Test_Pipe_Echo2(t *testing.T) { func Test_Pipe_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) assert.NoError(t, err) require.NotNil(t, w) @@ -157,7 +160,7 @@ func Test_Pipe_Broken2(t *testing.T) { } func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { - f := NewPipeFactory() + f := NewPipeFactory(log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorker(cmd) @@ -177,7 +180,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) sw := worker.From(w) b.ReportAllocs() @@ -204,7 +207,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) if err != nil { b.Fatal(err) } @@ -227,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) if err != nil { b.Fatal(err) } @@ -251,7 +254,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) { func Test_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -281,7 +284,7 @@ func Test_Echo2(t *testing.T) { func Test_BadPayload2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) sw := worker.From(w) @@ -305,7 +308,7 @@ func Test_BadPayload2(t *testing.T) { func Test_String2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -324,7 +327,7 @@ func Test_String2(t *testing.T) { func Test_Echo_Slow2(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -350,7 +353,7 @@ func Test_Echo_Slow2(t *testing.T) { func Test_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorker(cmd) + w, err := NewPipeFactory(log).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -367,7 +370,7 @@ func Test_Broken2(t *testing.T) { func Test_Error2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -394,7 +397,7 @@ func Test_Error2(t *testing.T) { func Test_NumExecs2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorker(cmd) + w, _ := NewPipeFactory(log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) }() diff --git a/transport/pipe/pipe_factory_test.go b/ipc/pipe/pipe_factory_test.go index cbf1431a..025498b5 100755 --- a/transport/pipe/pipe_factory_test.go +++ b/ipc/pipe/pipe_factory_test.go @@ -19,7 +19,7 @@ func Test_GetState(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) assert.Equal(t, worker.StateStopped, w.State().Value()) @@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -85,7 +85,7 @@ func Test_Pipe_StartError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -99,7 +99,7 @@ func Test_Pipe_PipeError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -114,7 +114,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -123,7 +123,7 @@ func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) @@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/invalid.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -142,7 +142,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -174,7 +174,7 @@ func Test_Pipe_Echo_Script(t *testing.T) { t.Parallel() cmd := exec.Command("sh", "../../tests/pipes_test_script.sh") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -205,7 +205,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Parallel() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) require.NoError(t, err) require.NotNil(t, w) @@ -225,7 +225,7 @@ func Test_Pipe_Broken(t *testing.T) { } func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { - f := NewPipeFactory() + f := NewPipeFactory(log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) @@ -245,7 +245,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(context.Background(), cmd) sw := worker.From(w) b.ReportAllocs() @@ -273,7 +273,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } @@ -297,7 +297,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } @@ -323,7 +323,7 @@ func Test_Echo(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -354,7 +354,7 @@ func Test_BadPayload(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) sw := worker.From(w) @@ -381,7 +381,7 @@ func Test_String(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -402,7 +402,7 @@ func Test_Echo_Slow(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -429,7 +429,7 @@ func Test_Broken(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -449,7 +449,7 @@ func Test_Error(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -478,7 +478,7 @@ func Test_NumExecs(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() diff --git a/transport/socket/socket_factory.go b/ipc/socket/socket_factory.go index 06d7000d..aa356424 100755 --- a/transport/socket/socket_factory.go +++ b/ipc/socket/socket_factory.go @@ -14,6 +14,7 @@ import ( "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -28,15 +29,18 @@ type Factory struct { // sockets which are waiting for process association relays sync.Map + + log *zap.Logger } // NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { +func NewSocketServer(ls net.Listener, tout time.Duration, log *zap.Logger) *Factory { f := &Factory{ ls: ls, tout: tout, relays: sync.Map{}, + log: log, } // Be careful @@ -87,7 +91,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w go func() { ctxT, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := worker.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) if err != nil { select { case c <- socketSpawn{ @@ -157,7 +161,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w } func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd) + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) if err != nil { return nil, err } diff --git a/transport/socket/socket_factory_spawn_test.go b/ipc/socket/socket_factory_spawn_test.go index 7fc6f4a5..36c6cce2 100644 --- a/transport/socket/socket_factory_spawn_test.go +++ b/ipc/socket/socket_factory_spawn_test.go @@ -12,8 +12,11 @@ import ( "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) +var log = zap.NewNop() + func Test_Tcp_Start2(t *testing.T) { ls, err := net.Listen("tcp", "127.0.0.1:9007") if assert.NoError(t, err) { @@ -29,7 +32,7 @@ func Test_Tcp_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -51,7 +54,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) { } cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) defer func() { err = ls.Close() if err != nil { @@ -92,7 +95,7 @@ func Test_Tcp_StartError2(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -112,7 +115,7 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err2) } @@ -132,7 +135,7 @@ func Test_Tcp_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -152,7 +155,7 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -192,7 +195,7 @@ func Test_Tcp_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -225,7 +228,7 @@ func Test_Unix_Start2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -249,7 +252,7 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) } @@ -264,7 +267,7 @@ func Test_Unix_Timeout2(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorker(cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "relay timeout") @@ -280,7 +283,7 @@ func Test_Unix_Invalid2(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorker(cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -295,7 +298,7 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -329,7 +332,7 @@ func Test_Unix_Echo2(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) if err != nil { t.Fatal(err) } @@ -363,7 +366,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) { assert.NoError(b, err) }() - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") @@ -392,7 +395,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) if err != nil { b.Fatal(err) } @@ -428,7 +431,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) { b.Skip("socket is busy") } - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") @@ -461,7 +464,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd) if err != nil { b.Fatal(err) } diff --git a/transport/socket/socket_factory_test.go b/ipc/socket/socket_factory_test.go index 5a078be4..9b48d233 100755 --- a/transport/socket/socket_factory_test.go +++ b/ipc/socket/socket_factory_test.go @@ -32,7 +32,7 @@ func Test_Tcp_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -56,7 +56,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { } cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) defer func() { err = ls.Close() if err != nil { @@ -101,7 +101,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - serv := NewSocketServer(ls, time.Minute) + serv := NewSocketServer(ls, time.Minute, log) time.Sleep(time.Second * 2) w, err := serv.SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) @@ -126,7 +126,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) } @@ -148,7 +148,7 @@ func Test_Tcp_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*1, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -171,7 +171,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -193,7 +193,7 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -235,7 +235,7 @@ func Test_Tcp_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -275,7 +275,7 @@ func Test_Tcp_Echo_Script(t *testing.T) { cmd := exec.Command("sh", "../../tests/socket_test_script.sh") - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() @@ -314,7 +314,7 @@ func Test_Unix_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -344,7 +344,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) } @@ -365,7 +365,7 @@ func Test_Unix_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -387,7 +387,7 @@ func Test_Unix_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -407,7 +407,7 @@ func Test_Unix_Broken(t *testing.T) { } cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -488,7 +488,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { b.Skip("socket is busy") } - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") @@ -523,7 +523,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } @@ -557,7 +557,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { b.Skip("socket is busy") } - f := NewSocketServer(ls, time.Minute) + f := NewSocketServer(ls, time.Minute, log) for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") @@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } diff --git a/pool/static_pool.go b/pool/static_pool.go index 019c34b2..dfd9ffd3 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -7,8 +7,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/ipc" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" @@ -36,7 +36,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory transport.Factory + factory ipc.Factory // manages worker states and TTLs ww Watcher @@ -49,7 +49,7 @@ type StaticPool struct { } // NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { +func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -303,7 +303,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 4f98ca91..5db2bd86 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -2,7 +2,7 @@ package pool import ( "context" - "log" + l "log" "os/exec" "runtime" "strconv" @@ -11,8 +11,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" @@ -26,12 +26,14 @@ var cfg = &Config{ DestroyTimeout: time.Second * 500, } +var log = zap.NewNop() + func Test_NewPool(t *testing.T) { ctx := context.Background() p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -46,7 +48,7 @@ func Test_NewPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -73,7 +75,7 @@ func Test_StaticPool_Invalid(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) @@ -85,7 +87,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -102,7 +104,7 @@ func Test_StaticPool_Echo(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -126,7 +128,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -150,7 +152,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -174,7 +176,7 @@ func Test_StaticPool_JobError(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -203,7 +205,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, WithLogger(z), ) @@ -230,7 +232,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg2, ) assert.NoError(t, err) @@ -268,7 +270,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, @@ -287,7 +289,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -326,7 +328,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: true, AllocateTimeout: time.Second, @@ -368,7 +370,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -409,7 +411,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -431,7 +433,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -461,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, @@ -486,7 +488,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -507,7 +509,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx, // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: false, NumWorkers: 1, @@ -538,7 +540,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -555,7 +557,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -572,7 +574,7 @@ func Test_CRC_WithPayload(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.Error(t, err) @@ -604,7 +606,7 @@ func Benchmark_Pool_Echo(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) if err != nil { @@ -636,7 +638,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, @@ -664,7 +666,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { defer wg.Done() if _, err := p.Exec(pld); err != nil { b.Fail() - log.Println(err) + l.Println(err) } }() } @@ -678,7 +680,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -694,7 +696,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { for n := 0; n < b.N; n++ { if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() - log.Println(err) + l.Println(err) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 6ff62316..a479671f 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,7 +32,7 @@ func TestSupervisedPool_Exec(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -62,7 +62,7 @@ func Test_SupervisedPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) assert.NoError(t, err) @@ -93,7 +93,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -131,7 +131,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -166,7 +166,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -223,7 +223,7 @@ func TestSupervisedPool_Idle(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -273,7 +273,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -321,7 +321,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -367,7 +367,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -402,7 +402,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) diff --git a/worker/worker.go b/worker/worker.go index 8ca55a3b..52bdbacb 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -62,7 +62,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { } if w.log == nil { - z, err := zap.NewDevelopment() + z, err := zap.NewProduction() if err != nil { return nil, err } |