summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2022-01-12 00:28:16 +0300
committerGitHub <[email protected]>2022-01-12 00:28:16 +0300
commita191eac78092dda89dbcd19c7a3a171f6aafb71a (patch)
tree48db67c0329dc33f63bd9784fbcf50a410434ebe
parente4ee005938a388de4e4bbb9fad097b563989e158 (diff)
parent657e5969414419180af49e1440e91c0f271985da (diff)
[#888]: bug(logger): the RR logger wasn't passed from the poolv2.7.0-rc.2
-rw-r--r--Makefile8
-rw-r--r--go.mod2
-rw-r--r--go.sum7
-rw-r--r--ipc/interface.go (renamed from transport/interface.go)2
-rwxr-xr-xipc/pipe/pipe_factory.go (renamed from transport/pipe/pipe_factory.go)15
-rw-r--r--ipc/pipe/pipe_factory_spawn_test.go (renamed from transport/pipe/pipe_factory_spawn_test.go)45
-rwxr-xr-xipc/pipe/pipe_factory_test.go (renamed from transport/pipe/pipe_factory_test.go)44
-rwxr-xr-xipc/socket/socket_factory.go (renamed from transport/socket/socket_factory.go)10
-rw-r--r--ipc/socket/socket_factory_spawn_test.go (renamed from transport/socket/socket_factory_spawn_test.go)37
-rwxr-xr-xipc/socket/socket_factory_test.go (renamed from transport/socket/socket_factory_test.go)38
-rwxr-xr-xpool/static_pool.go8
-rwxr-xr-xpool/static_pool_test.go60
-rw-r--r--pool/supervisor_test.go22
-rwxr-xr-xworker/worker.go2
14 files changed, 161 insertions, 139 deletions
diff --git a/Makefile b/Makefile
index d2f4ebc6..67199bb0 100644
--- a/Makefile
+++ b/Makefile
@@ -7,8 +7,8 @@ SHELL = /bin/sh
test_coverage:
rm -rf coverage-ci
mkdir ./coverage-ci
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./transport/pipe
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./transport/socket
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./ipc/pipe
+ go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./ipc/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pool.out -covermode=atomic ./pool
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/worker.out -covermode=atomic ./worker
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/bst.out -covermode=atomic ./bst
@@ -19,8 +19,8 @@ test_coverage:
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt
test: ## Run application tests
- go test -v -race -tags=debug ./transport/pipe
- go test -v -race -tags=debug ./transport/socket
+ go test -v -race -tags=debug ./ipc/pipe
+ go test -v -race -tags=debug ./ipc/socket
go test -v -race -tags=debug ./pool
go test -v -race -tags=debug ./worker
go test -v -race -tags=debug ./worker_watcher
diff --git a/go.mod b/go.mod
index 98a817c8..b59509da 100644
--- a/go.mod
+++ b/go.mod
@@ -27,7 +27,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/zap v1.20.0
- golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
+ golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
diff --git a/go.sum b/go.sum
index 37e249f4..c23f0fe9 100644
--- a/go.sum
+++ b/go.sum
@@ -51,11 +51,14 @@ github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
-go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
+go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
+go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.20.0 h1:N4oPlghZwYG55MlU6LXk/Zp00FVNE9X9wrYO8CEs4lc=
go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
@@ -78,6 +81,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220111092808-5a964db01320 h1:0jf+tOCoZ3LyutmCOWpVni1chK4VfFLhRsDK7MhqGRY=
+golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
diff --git a/transport/interface.go b/ipc/interface.go
index 0d6c8e8b..1d70cd21 100644
--- a/transport/interface.go
+++ b/ipc/interface.go
@@ -1,4 +1,4 @@
-package transport
+package ipc
import (
"context"
diff --git a/transport/pipe/pipe_factory.go b/ipc/pipe/pipe_factory.go
index c70b3f65..4a3c9a67 100755
--- a/transport/pipe/pipe_factory.go
+++ b/ipc/pipe/pipe_factory.go
@@ -7,16 +7,21 @@ import (
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
+ "go.uber.org/zap"
)
// Factory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type Factory struct{}
+type Factory struct {
+ log *zap.Logger
+}
// NewPipeFactory returns new factory instance and starts
// listening
-func NewPipeFactory() *Factory {
- return &Factory{}
+func NewPipeFactory(log *zap.Logger) *Factory {
+ return &Factory{
+ log: log,
+ }
}
type sr struct {
@@ -29,7 +34,7 @@ type sr struct {
func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) {
spCh := make(chan sr)
go func() {
- w, err := worker.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
if err != nil {
select {
case spCh <- sr{
@@ -130,7 +135,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
if err != nil {
return nil, err
}
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/ipc/pipe/pipe_factory_spawn_test.go
index 9aa12564..2ce5a257 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/ipc/pipe/pipe_factory_spawn_test.go
@@ -11,12 +11,15 @@ import (
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.uber.org/zap"
)
+var log = zap.NewNop()
+
func Test_GetState2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, worker.StateStopped, w.State().Value())
@@ -32,7 +35,7 @@ func Test_GetState2(t *testing.T) {
func Test_Kill2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -55,7 +58,7 @@ func Test_Kill2(t *testing.T) {
func Test_Pipe_Start2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -73,7 +76,7 @@ func Test_Pipe_StartError2(t *testing.T) {
t.Errorf("error running the command: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -85,7 +88,7 @@ func Test_Pipe_PipeError3(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -97,28 +100,28 @@ func Test_Pipe_PipeError4(t *testing.T) {
t.Errorf("error creating the STDIN pipe: error %v", err)
}
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
}
func Test_Pipe_Invalid2(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
func Test_Pipe_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.NoError(t, err)
sw := worker.From(w)
@@ -142,7 +145,7 @@ func Test_Pipe_Echo2(t *testing.T) {
func Test_Pipe_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
assert.NoError(t, err)
require.NotNil(t, w)
@@ -157,7 +160,7 @@ func Test_Pipe_Broken2(t *testing.T) {
}
func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
- f := NewPipeFactory()
+ f := NewPipeFactory(log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorker(cmd)
@@ -177,7 +180,7 @@ func Benchmark_Pipe_SpawnWorker_Stop2(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
sw := worker.From(w)
b.ReportAllocs()
@@ -204,7 +207,7 @@ func Benchmark_Pipe_Worker_ExecEcho2(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
}
@@ -227,7 +230,7 @@ func Benchmark_Pipe_Worker_ExecEcho4(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
}
@@ -251,7 +254,7 @@ func Benchmark_Pipe_Worker_ExecEchoWithoutContext2(b *testing.B) {
func Test_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -281,7 +284,7 @@ func Test_Echo2(t *testing.T) {
func Test_BadPayload2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
sw := worker.From(w)
@@ -305,7 +308,7 @@ func Test_BadPayload2(t *testing.T) {
func Test_String2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -324,7 +327,7 @@ func Test_String2(t *testing.T) {
func Test_Echo_Slow2(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -350,7 +353,7 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ w, err := NewPipeFactory(log).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -367,7 +370,7 @@ func Test_Broken2(t *testing.T) {
func Test_Error2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -394,7 +397,7 @@ func Test_Error2(t *testing.T) {
func Test_NumExecs2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorker(cmd)
+ w, _ := NewPipeFactory(log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
}()
diff --git a/transport/pipe/pipe_factory_test.go b/ipc/pipe/pipe_factory_test.go
index cbf1431a..025498b5 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/ipc/pipe/pipe_factory_test.go
@@ -19,7 +19,7 @@ func Test_GetState(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, worker.StateStopped, w.State().Value())
@@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -85,7 +85,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -99,7 +99,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -114,7 +114,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -123,7 +123,7 @@ func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
@@ -133,7 +133,7 @@ func Test_Pipe_Invalid(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/invalid.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -142,7 +142,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -174,7 +174,7 @@ func Test_Pipe_Echo_Script(t *testing.T) {
t.Parallel()
cmd := exec.Command("sh", "../../tests/pipes_test_script.sh")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -205,7 +205,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Parallel()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
require.NoError(t, err)
require.NotNil(t, w)
@@ -225,7 +225,7 @@ func Test_Pipe_Broken(t *testing.T) {
}
func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
- f := NewPipeFactory()
+ f := NewPipeFactory(log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
@@ -245,7 +245,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(context.Background(), cmd)
sw := worker.From(w)
b.ReportAllocs()
@@ -273,7 +273,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -297,7 +297,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -323,7 +323,7 @@ func Test_Echo(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -354,7 +354,7 @@ func Test_BadPayload(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
sw := worker.From(w)
@@ -381,7 +381,7 @@ func Test_String(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -402,7 +402,7 @@ func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -429,7 +429,7 @@ func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -449,7 +449,7 @@ func Test_Error(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -478,7 +478,7 @@ func Test_NumExecs(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewPipeFactory(log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
diff --git a/transport/socket/socket_factory.go b/ipc/socket/socket_factory.go
index 06d7000d..aa356424 100755
--- a/transport/socket/socket_factory.go
+++ b/ipc/socket/socket_factory.go
@@ -14,6 +14,7 @@ import (
"github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/worker"
+ "go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@@ -28,15 +29,18 @@ type Factory struct {
// sockets which are waiting for process association
relays sync.Map
+
+ log *zap.Logger
}
// NewSocketServer returns Factory attached to a given socket listener.
// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
+func NewSocketServer(ls net.Listener, tout time.Duration, log *zap.Logger) *Factory {
f := &Factory{
ls: ls,
tout: tout,
relays: sync.Map{},
+ log: log,
}
// Be careful
@@ -87,7 +91,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w
go func() {
ctxT, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := worker.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
if err != nil {
select {
case c <- socketSpawn{
@@ -157,7 +161,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*w
}
func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) {
- w, err := worker.InitBaseWorker(cmd)
+ w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log))
if err != nil {
return nil, err
}
diff --git a/transport/socket/socket_factory_spawn_test.go b/ipc/socket/socket_factory_spawn_test.go
index 7fc6f4a5..36c6cce2 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/ipc/socket/socket_factory_spawn_test.go
@@ -12,8 +12,11 @@ import (
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.uber.org/zap"
)
+var log = zap.NewNop()
+
func Test_Tcp_Start2(t *testing.T) {
ls, err := net.Listen("tcp", "127.0.0.1:9007")
if assert.NoError(t, err) {
@@ -29,7 +32,7 @@ func Test_Tcp_Start2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -51,7 +54,7 @@ func Test_Tcp_StartCloseFactory2(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
defer func() {
err = ls.Close()
if err != nil {
@@ -92,7 +95,7 @@ func Test_Tcp_StartError2(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -112,7 +115,7 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err2)
}
@@ -132,7 +135,7 @@ func Test_Tcp_Invalid2(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -152,7 +155,7 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -192,7 +195,7 @@ func Test_Tcp_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -225,7 +228,7 @@ func Test_Unix_Start2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -249,7 +252,7 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
}
@@ -264,7 +267,7 @@ func Test_Unix_Timeout2(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "relay timeout")
@@ -280,7 +283,7 @@ func Test_Unix_Invalid2(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorker(cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -295,7 +298,7 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -329,7 +332,7 @@ func Test_Unix_Echo2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
@@ -363,7 +366,7 @@ func Benchmark_Tcp_SpawnWorker_Stop2(b *testing.B) {
assert.NoError(b, err)
}()
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
@@ -392,7 +395,7 @@ func Benchmark_Tcp_Worker_ExecEcho2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
}
@@ -428,7 +431,7 @@ func Benchmark_Unix_SpawnWorker_Stop2(b *testing.B) {
b.Skip("socket is busy")
}
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
@@ -461,7 +464,7 @@ func Benchmark_Unix_Worker_ExecEcho2(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorker(cmd)
if err != nil {
b.Fatal(err)
}
diff --git a/transport/socket/socket_factory_test.go b/ipc/socket/socket_factory_test.go
index 5a078be4..9b48d233 100755
--- a/transport/socket/socket_factory_test.go
+++ b/ipc/socket/socket_factory_test.go
@@ -32,7 +32,7 @@ func Test_Tcp_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -56,7 +56,7 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
defer func() {
err = ls.Close()
if err != nil {
@@ -101,7 +101,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- serv := NewSocketServer(ls, time.Minute)
+ serv := NewSocketServer(ls, time.Minute, log)
time.Sleep(time.Second * 2)
w, err := serv.SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
@@ -126,7 +126,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
}
@@ -148,7 +148,7 @@ func Test_Tcp_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*1, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -171,7 +171,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -193,7 +193,7 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -235,7 +235,7 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -275,7 +275,7 @@ func Test_Tcp_Echo_Script(t *testing.T) {
cmd := exec.Command("sh", "../../tests/socket_test_script.sh")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, _ := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
@@ -314,7 +314,7 @@ func Test_Unix_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -344,7 +344,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*5, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
}
@@ -365,7 +365,7 @@ func Test_Unix_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -387,7 +387,7 @@ func Test_Unix_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*10, log).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -407,7 +407,7 @@ func Test_Unix_Broken(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -448,7 +448,7 @@ func Test_Unix_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -488,7 +488,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
b.Skip("socket is busy")
}
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
@@ -523,7 +523,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -557,7 +557,7 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
b.Skip("socket is busy")
}
- f := NewSocketServer(ls, time.Minute)
+ f := NewSocketServer(ls, time.Minute, log)
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
@@ -588,7 +588,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute, log).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 019c34b2..dfd9ffd3 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -7,8 +7,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/ipc"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
@@ -36,7 +36,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory transport.Factory
+ factory ipc.Factory
// manages worker states and TTLs
ww Watcher
@@ -49,7 +49,7 @@ type StaticPool struct {
}
// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) {
if factory == nil {
return nil, errors.Str("no factory initialized")
}
@@ -303,7 +303,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 4f98ca91..5db2bd86 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -2,7 +2,7 @@ package pool
import (
"context"
- "log"
+ l "log"
"os/exec"
"runtime"
"strconv"
@@ -11,8 +11,8 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/ipc/pipe"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -26,12 +26,14 @@ var cfg = &Config{
DestroyTimeout: time.Second * 500,
}
+var log = zap.NewNop()
+
func Test_NewPool(t *testing.T) {
ctx := context.Background()
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -46,7 +48,7 @@ func Test_NewPoolReset(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -73,7 +75,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
@@ -85,7 +87,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -102,7 +104,7 @@ func Test_StaticPool_Echo(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -126,7 +128,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -150,7 +152,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -174,7 +176,7 @@ func Test_StaticPool_JobError(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -203,7 +205,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
WithLogger(z),
)
@@ -230,7 +232,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg2,
)
assert.NoError(t, err)
@@ -268,7 +270,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
@@ -287,7 +289,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -326,7 +328,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
Debug: true,
AllocateTimeout: time.Second,
@@ -368,7 +370,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -409,7 +411,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -431,7 +433,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -461,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
@@ -486,7 +488,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -507,7 +509,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx,
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
Debug: false,
NumWorkers: 1,
@@ -538,7 +540,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -555,7 +557,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -572,7 +574,7 @@ func Test_CRC_WithPayload(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.Error(t, err)
@@ -604,7 +606,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
if err != nil {
@@ -636,7 +638,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
@@ -664,7 +666,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
defer wg.Done()
if _, err := p.Exec(pld); err != nil {
b.Fail()
- log.Println(err)
+ l.Println(err)
}
}()
}
@@ -678,7 +680,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -694,7 +696,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
for n := 0; n < b.N; n++ {
if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
- log.Println(err)
+ l.Println(err)
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 6ff62316..a479671f 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -7,8 +7,8 @@ import (
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/ipc/pipe"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -32,7 +32,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
@@ -62,7 +62,7 @@ func Test_SupervisedPoolReset(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
assert.NoError(t, err)
@@ -93,7 +93,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
@@ -131,7 +131,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -166,7 +166,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -223,7 +223,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -273,7 +273,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -321,7 +321,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -367,7 +367,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -402,7 +402,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
diff --git a/worker/worker.go b/worker/worker.go
index 8ca55a3b..52bdbacb 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -62,7 +62,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
}
if w.log == nil {
- z, err := zap.NewDevelopment()
+ z, err := zap.NewProduction()
if err != nil {
return nil, err
}