summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/linters.yml2
-rw-r--r--CHANGELOG.md13
-rw-r--r--go.mod5
-rw-r--r--go.sum41
-rwxr-xr-xpool/static_pool.go61
-rwxr-xr-xpool/static_pool_test.go84
-rwxr-xr-xpool/supervisor_pool.go17
-rw-r--r--pool/supervisor_test.go29
-rw-r--r--transport/pipe/pipe_factory_spawn_test.go26
-rwxr-xr-xtransport/pipe/pipe_factory_test.go23
-rw-r--r--transport/socket/socket_factory_spawn_test.go46
-rwxr-xr-xtransport/socket/socket_factory_test.go47
-rw-r--r--utils/convert.go25
-rwxr-xr-xutils/isolate.go60
-rwxr-xr-xutils/isolate_win.go18
-rwxr-xr-xutils/network.go108
-rwxr-xr-xutils/network_windows.go64
-rw-r--r--utils/race_checker.go35
-rw-r--r--utils/race_checker_unsafe.go5
-rw-r--r--utils/to_ptr.go467
-rwxr-xr-xworker/worker.go51
-rwxr-xr-xworker_watcher/worker_watcher.go38
22 files changed, 958 insertions, 307 deletions
diff --git a/.github/workflows/linters.yml b/.github/workflows/linters.yml
index 0c7620d8..ba6d0b9f 100644
--- a/.github/workflows/linters.yml
+++ b/.github/workflows/linters.yml
@@ -15,4 +15,4 @@ jobs:
with:
version: v1.43 # without patch version
only-new-issues: false # show only new issues if it's a pull request
- args: --timeout=10m
+ args: --timeout=10m --build-tags=race
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a15c2d2e..2bc56a72 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,18 @@
# CHANGELOG
+# v2.7.0 (?.?.2022)
+
+## 👀 New:
+
+- ✏️ RR `workers pool`, `worker`, `worker_watcher` now has their own log levels. `stderr/stdout` logged as before at the `info` log level. All other messages moved to the `debug` log level except a few events from the `worker_watcher` when RR can't allocate the new worker which are moved to the `warn`.
+- ✏️ Use the common logger for the whole roadrunner-sdk and roadrunner-plugins.
+
+## 🩹 Fixes:
+
+- 🐛 Fix: RR may have missed the message from the `stderr` when the PHP script failed to start immediately after RR starts.
+
+---
+
# v2.6.3 (23.12.2021)
## 👀 New:
diff --git a/go.mod b/go.mod
index c5547a85..7971cc66 100644
--- a/go.mod
+++ b/go.mod
@@ -18,15 +18,16 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
- github.com/kr/pretty v0.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/spiral/tcplisten v1.0.0
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
- golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed // indirect
+ go.uber.org/zap v1.19.1
+ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
diff --git a/go.sum b/go.sum
index cf51378e..4e748829 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,5 @@
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -20,6 +22,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
@@ -28,6 +32,8 @@ github.com/spiral/errors v1.0.12 h1:38Waf8ZL/Xvxg4HTYGmrUbvi7TCHivmuatNQZlBhQ8s=
github.com/spiral/errors v1.0.12/go.mod h1:j5UReqxZxfkwXkI9mFY87VhEXcXmSg7kAk5Sswy1eEA=
github.com/spiral/goridge/v3 v3.2.4 h1:Octzn0kgpQY10vQhrf0a5RlzCVW1lIEuR6/IGYp0BW4=
github.com/spiral/goridge/v3 v3.2.4/go.mod h1:a6qAtZy+FBaPj/76GweHj6SkgIr+oRVgW5p4e5vLZF4=
+github.com/spiral/tcplisten v1.0.0 h1:dII3R20Xslll6Uk60ac1JCn9zQwfwbt88CLrs3OryZg=
+github.com/spiral/tcplisten v1.0.0/go.mod h1:+anIsZh2ZYw2EogG2pO1yEZKcGN7lEf41hUQilctYJo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@@ -39,25 +45,60 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
+go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
+go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
+go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed h1:d5glpD+GMms2DMbu1doSYibjbKasYNvnhq885nOnRz8=
golang.org/x/sys v0.0.0-20211214170744-3b038e5940ed/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 7481f84f..019c34b2 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -2,7 +2,6 @@ package pool
import (
"context"
- "fmt"
"os/exec"
"time"
@@ -13,13 +12,12 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
+ "go.uber.org/zap"
)
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = `{"stop":true}`
- // pluginName ...
- pluginName = "pool"
)
// ErrorEncoder encode error or make a decision based on the error type
@@ -32,6 +30,7 @@ type Command func() *exec.Cmd
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
cfg *Config
+ log *zap.Logger
// worker command creator
cmd Command
@@ -39,9 +38,6 @@ type StaticPool struct {
// creates and connects to stack
factory transport.Factory
- events events.EventBus
- eventsID string
-
// manages worker states and TTLs
ww Watcher
@@ -52,8 +48,8 @@ type StaticPool struct {
errEncoder ErrorEncoder
}
-// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
if factory == nil {
return nil, errors.Str("no factory initialized")
}
@@ -64,13 +60,10 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
cfg.MaxJobs = 1
}
- eb, id := events.Bus()
p := &StaticPool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- events: eb,
- eventsID: id,
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
}
// add pool options
@@ -78,10 +71,19 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
options[i](p)
}
+ if p.log == nil {
+ z, err := zap.NewProduction()
+ if err != nil {
+ return nil, err
+ }
+
+ p.log = z
+ }
+
// set up workers allocator
p.allocator = p.newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
// set up workers watcher
- p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
+ p.ww = workerWatcher.NewSyncWorkerWatcher(p.allocator, p.log, p.cfg.NumWorkers, p.cfg.AllocateTimeout)
// allocate requested number of workers
workers, err := p.allocateWorkers(p.cfg.NumWorkers)
@@ -99,7 +101,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
// if supervised config not nil, guess, that pool wanted to be supervised
if cfg.Supervisor != nil {
- sp := supervisorWrapper(p, eb, p.cfg.Supervisor)
+ sp := supervisorWrapper(p, p.log, p.cfg.Supervisor)
// start watcher timer
sp.Start()
return sp, nil
@@ -108,6 +110,12 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
return p, nil
}
+func WithLogger(z *zap.Logger) Options {
+ return func(p *StaticPool) {
+ p.log = z
+ }
+}
+
// GetConfig returns associated pool configuration. Immutable.
func (sp *StaticPool) GetConfig() interface{} {
return sp.cfg
@@ -158,7 +166,6 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
sp.ww.Destroy(ctx)
}
@@ -183,13 +190,12 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "execTTL timeout elapsed"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventExecTTL.String()), zap.Error(err))
w.State().Set(worker.StateInvalid)
return nil, err
case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("worker stopped, and will be restarted", zap.String("reason", "worker error"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// if max jobs exceed
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
// mark old as invalid and stop
@@ -209,15 +215,14 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
case errors.Is(errors.Network, err):
// in case of network error, we can't stop the worker, we should kill it
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
+ sp.log.Warn("network error, worker will be restarted", zap.String("reason", "network"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// kill the worker instead of sending net packet to it
_ = w.Kill()
return nil, err
default:
w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ sp.log.Warn("worker will be restarted", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))
// stop the worker, worker here might be in the broken state (network)
errS := w.Stop()
if errS != nil {
@@ -268,7 +273,7 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %v, worker's pid: %d", err.Error(), w.Pid())))
+ sp.log.Warn("user requested worker to be stopped", zap.String("reason", "user event"), zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
}
}
@@ -289,7 +294,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
- sp.events.Send(events.NewEvent(events.EventNoFreeWorkers, pluginName, fmt.Sprintf("no free workers in the pool, error: %s", err)))
+ sp.log.Error("no free workers in the pool, wait timeout exceed", zap.String("reason", "no free workers"), zap.String("internal_event_name", events.EventNoFreeWorkers.String()), zap.Error(err))
return nil, errors.E(op, err)
}
// else if err not nil - return error
@@ -310,7 +315,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// wrap sync worker
sw := worker.From(w)
- sp.events.Send(events.NewEvent(events.EventWorkerConstruct, pluginName, fmt.Sprintf("worker allocated, pid: %d", sw.Pid())))
+ sp.log.Debug("worker is allocated", zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerConstruct.String()))
return sw, nil
}
}
@@ -336,7 +341,7 @@ func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
// destroy the worker
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}
@@ -363,7 +368,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
err = sw.Stop()
if err != nil {
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, sw.Pid())))
+ sp.log.Debug("debug mode: worker stopped", zap.String("reason", "worker error"), zap.Int64("pid", sw.Pid()), zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
return nil, err
}
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index a45aa29d..4f98ca91 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -6,19 +6,18 @@ import (
"os/exec"
"runtime"
"strconv"
- "strings"
"sync"
"testing"
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "go.uber.org/zap"
)
var cfg = &Config{
@@ -29,7 +28,7 @@ var cfg = &Config{
func Test_NewPool(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -44,7 +43,7 @@ func Test_NewPool(t *testing.T) {
func Test_NewPoolReset(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -71,7 +70,7 @@ func Test_NewPoolReset(t *testing.T) {
}
func Test_StaticPool_Invalid(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") },
pipe.NewPipeFactory(),
@@ -83,7 +82,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
}
func Test_ConfigNoErrorInitDefaults(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -100,7 +99,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
func Test_StaticPool_Echo(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -124,7 +123,7 @@ func Test_StaticPool_Echo(t *testing.T) {
func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -148,7 +147,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
func Test_StaticPool_Echo_Context(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") },
pipe.NewPipeFactory(),
@@ -172,7 +171,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
func Test_StaticPool_JobError(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") },
pipe.NewPipeFactory(),
@@ -198,17 +197,15 @@ func Test_StaticPool_JobError(t *testing.T) {
func Test_StaticPool_Broken_Replace(t *testing.T) {
ctx := context.Background()
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
+ z, err := zap.NewProduction()
require.NoError(t, err)
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
pipe.NewPipeFactory(),
cfg,
+ WithLogger(z),
)
assert.NoError(t, err)
assert.NotNil(t, p)
@@ -218,31 +215,19 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- event := <-ch
- if !strings.Contains(event.Message(), "undefined_function()") {
- t.Fatal("event should contain undefiled function()")
- }
-
p.Destroy(ctx)
}
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx := context.Background()
- // Run pool events
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "pool.EventWorkerConstruct", ch)
- require.NoError(t, err)
-
var cfg2 = &Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -264,7 +249,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.Equal(t, 1, len(p.Workers()))
// first creation
- <-ch
+ time.Sleep(time.Second * 2)
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill()
if err != nil {
@@ -272,8 +257,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
// re-creation
- <-ch
-
+ time.Sleep(time.Second * 2)
list := p.Workers()
for _, w := range list {
assert.Equal(t, worker.StateReady, w.State().Value())
@@ -281,7 +265,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
func Test_StaticPool_AllocateTimeout(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -300,7 +284,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -339,7 +323,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
func Test_StaticPool_Debug_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
pipe.NewPipeFactory(),
@@ -381,7 +365,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(),
@@ -422,7 +406,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -444,7 +428,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
pipe.NewPipeFactory(),
@@ -474,7 +458,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Handle_Dead(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -499,7 +483,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_Slow_Destroy(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -519,13 +503,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx := context.Background()
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "pool.EventNoFreeWorkers", ch)
- require.NoError(t, err)
-
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
@@ -550,14 +528,14 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- <-ch
+ time.Sleep(time.Second)
p.Destroy(ctx)
}
// identical to replace but controlled on worker side
func Test_Static_Pool_WrongCommand1(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -574,7 +552,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
// identical to replace but controlled on worker side
func Test_Static_Pool_WrongCommand2(t *testing.T) {
- p, err := Initialize(
+ p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -591,7 +569,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
func Test_CRC_WithPayload(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
pipe.NewPipeFactory(),
@@ -623,7 +601,7 @@ Benchmark_Pool_Echo-32 54374 27776 ns/op 7947 B/op 19 allo
*/
func Benchmark_Pool_Echo(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -655,7 +633,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
// PTR -> Benchmark_Pool_Echo_Batched-32 413312 2904 ns/op 1067 B/op 23 allocs/op
func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -697,7 +675,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
// Benchmark_Pool_Echo_Replaced-32 104/100 10900218 ns/op 52365 B/op 125 allocs/op
func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 0502dc9a..59834859 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -2,7 +2,6 @@ package pool
import (
"context"
- "fmt"
"sync"
"time"
@@ -11,11 +10,11 @@ import (
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/state/process"
"github.com/spiral/roadrunner/v2/worker"
+ "go.uber.org/zap"
)
const (
- MB = 1024 * 1024
- supervisorName string = "supervisor"
+ MB = 1024 * 1024
)
// NSEC_IN_SEC nanoseconds in second
@@ -29,17 +28,17 @@ type Supervised interface {
type supervised struct {
cfg *SupervisorConfig
- events events.EventBus
pool Pool
+ log *zap.Logger
stopCh chan struct{}
mu *sync.RWMutex
}
-func supervisorWrapper(pool Pool, eb events.EventBus, cfg *SupervisorConfig) Supervised {
+func supervisorWrapper(pool Pool, log *zap.Logger, cfg *SupervisorConfig) *supervised {
sp := &supervised{
cfg: cfg,
- events: eb,
pool: pool,
+ log: log,
mu: &sync.RWMutex{},
stopCh: make(chan struct{}),
}
@@ -166,7 +165,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventTTL, supervisorName, fmt.Sprintf("ttl reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("ttl", zap.String("reason", "ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String()))
continue
}
@@ -186,7 +185,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventMaxMemory, supervisorName, fmt.Sprintf("max memory reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("memory_limit", zap.String("reason", "max memory is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventMaxMemory.String()))
continue
}
@@ -241,7 +240,7 @@ func (sp *supervised) control() { //nolint:gocognit
}
// just to double-check
workers[i].State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventIdleTTL, supervisorName, fmt.Sprintf("idle ttl reached, worker's pid: %d", workers[i].Pid())))
+ sp.log.Debug("idle_ttl", zap.String("reason", "idle ttl is reached"), zap.Int64("pid", workers[i].Pid()), zap.String("internal_event_name", events.EventTTL.String()))
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 6e8ab552..6ff62316 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -7,7 +7,6 @@ import (
"testing"
"time"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/worker"
@@ -30,7 +29,7 @@ var cfgSupervised = &Config{
func TestSupervisedPool_Exec(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
@@ -60,7 +59,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
func Test_SupervisedPoolReset(t *testing.T) {
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
@@ -91,7 +90,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
cfgSupervised.Debug = true
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") },
pipe.NewPipeFactory(),
@@ -129,7 +128,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
pipe.NewPipeFactory(),
@@ -164,7 +163,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") },
pipe.NewPipeFactory(),
@@ -221,7 +220,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") },
pipe.NewPipeFactory(),
@@ -271,7 +270,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
pipe.NewPipeFactory(),
@@ -319,7 +318,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
},
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
pipe.NewPipeFactory(),
@@ -361,17 +360,11 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
},
}
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "supervisor.EventMaxMemory", ch)
- require.NoError(t, err)
-
// constructed
// max memory
// constructed
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
pipe.NewPipeFactory(),
@@ -390,7 +383,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
assert.Empty(t, resp.Body)
assert.Empty(t, resp.Context)
- <-ch
+ time.Sleep(time.Second)
p.Destroy(context.Background())
}
@@ -406,7 +399,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
}
ctx := context.Background()
- p, err := Initialize(
+ p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") },
pipe.NewPipeFactory(),
diff --git a/transport/pipe/pipe_factory_spawn_test.go b/transport/pipe/pipe_factory_spawn_test.go
index 7e04f113..9aa12564 100644
--- a/transport/pipe/pipe_factory_spawn_test.go
+++ b/transport/pipe/pipe_factory_spawn_test.go
@@ -2,13 +2,11 @@ package pipe
import (
"os/exec"
- "strings"
"sync"
"testing"
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -106,21 +104,9 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
-
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewPipeFactory().SpawnWorker(cmd)
-
assert.Nil(t, w)
assert.Error(t, err)
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Pipe_Invalid2(t *testing.T) {
@@ -364,29 +350,17 @@ func Test_Echo_Slow2(t *testing.T) {
func Test_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewPipeFactory().SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
}
sw := worker.From(w)
-
res, err := sw.Exec(&payload.Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res)
time.Sleep(time.Second * 3)
-
- msg := <-ch
- if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
- t.Fail()
- }
assert.Error(t, w.Stop())
}
diff --git a/transport/pipe/pipe_factory_test.go b/transport/pipe/pipe_factory_test.go
index c69be298..cbf1431a 100755
--- a/transport/pipe/pipe_factory_test.go
+++ b/transport/pipe/pipe_factory_test.go
@@ -3,13 +3,11 @@ package pipe
import (
"context"
"os/exec"
- "strings"
"sync"
"testing"
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -125,21 +123,10 @@ func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
-
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Pipe_Invalid(t *testing.T) {
@@ -442,12 +429,6 @@ func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err := eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
@@ -460,10 +441,6 @@ func Test_Broken(t *testing.T) {
assert.Nil(t, res)
time.Sleep(time.Second * 3)
- msg := <-ch
- if strings.ContainsAny(msg.Message(), "undefined_function()") == false {
- t.Fail()
- }
assert.Error(t, w.Stop())
}
diff --git a/transport/socket/socket_factory_spawn_test.go b/transport/socket/socket_factory_spawn_test.go
index fd852080..7fc6f4a5 100644
--- a/transport/socket/socket_factory_spawn_test.go
+++ b/transport/socket/socket_factory_spawn_test.go
@@ -3,13 +3,11 @@ package socket
import (
"net"
"os/exec"
- "strings"
"sync"
"syscall"
"testing"
"time"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -114,20 +112,9 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err2)
-
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -165,12 +152,6 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -189,11 +170,6 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Nil(t, res)
wg.Wait()
- ev := <-ch
- if !strings.Contains(ev.Message(), "undefined_function()") {
- t.Fatal("should contain undefined_function() string")
- }
-
time.Sleep(time.Second)
err2 := w.Stop()
// write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
@@ -273,20 +249,9 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
assert.Nil(t, w)
assert.Error(t, err)
-
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -330,12 +295,6 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
if err != nil {
t.Fatal(err)
@@ -355,11 +314,6 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Nil(t, res)
wg.Wait()
- ev := <-ch
- if !strings.Contains(ev.Message(), "undefined_function()") {
- t.Fatal("should contain undefined_function string")
- }
-
time.Sleep(time.Second)
err = w.Stop()
assert.NoError(t, err)
diff --git a/transport/socket/socket_factory_test.go b/transport/socket/socket_factory_test.go
index 10885bac..5a078be4 100755
--- a/transport/socket/socket_factory_test.go
+++ b/transport/socket/socket_factory_test.go
@@ -4,12 +4,10 @@ import (
"context"
"net"
"os/exec"
- "strings"
"sync"
"testing"
"time"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/payload"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -128,20 +126,9 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
-
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -206,12 +193,6 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
@@ -230,11 +211,6 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Nil(t, res)
wg.Wait()
- ev := <-ch
- if !strings.Contains(ev.Message(), "undefined_function()") {
- t.Fatal("should contain undefined_function string")
- }
-
time.Sleep(time.Second)
err2 := w.Stop()
// write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
@@ -368,20 +344,9 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
-
- ev := <-ch
- if !strings.Contains(ev.Message(), "failboot") {
- t.Fatal("should contain failboot string")
- }
}
func Test_Unix_Timeout(t *testing.T) {
@@ -442,13 +407,6 @@ func Test_Unix_Broken(t *testing.T) {
}
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
-
- eb, id := events.Bus()
- defer eb.Unsubscribe(id)
- ch := make(chan events.Event, 10)
- err = eb.SubscribeP(id, "worker.EventWorkerStderr", ch)
- require.NoError(t, err)
-
w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
@@ -467,11 +425,6 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res)
- ev := <-ch
- if !strings.Contains(ev.Message(), "undefined_function()") {
- t.Fatal("should contain undefined_function string")
- }
-
time.Sleep(time.Second)
err = w.Stop()
assert.NoError(t, err)
diff --git a/utils/convert.go b/utils/convert.go
index 8728ad1f..d96acfbb 100644
--- a/utils/convert.go
+++ b/utils/convert.go
@@ -5,6 +5,24 @@ import (
"unsafe"
)
+// AsBytes returns a slice that refers to the data backing the string s.
+func AsBytes(s string) []byte {
+ // get the pointer to the data of the string
+ p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)
+
+ var b []byte
+ hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ hdr.Data = uintptr(p)
+ // we need to set the cap and len for the string to byte convert
+ // because string is shorter than []bytes
+ hdr.Cap = len(s)
+ hdr.Len = len(s)
+
+ // checker to check mutable access to the data
+ SetChecker(b)
+ return b
+}
+
// AsString returns a string that refers to the data backing the slice s.
func AsString(b []byte) string {
p := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)
@@ -14,10 +32,7 @@ func AsString(b []byte) string {
hdr.Data = uintptr(p)
hdr.Len = len(b)
+ // checker to check mutable access to the data
+ SetChecker(b)
return s
}
-
-// Uint64 returns a pointer value for the uint64 value passed in.
-func Uint64(v uint64) *uint64 {
- return &v
-}
diff --git a/utils/isolate.go b/utils/isolate.go
new file mode 100755
index 00000000..202f538c
--- /dev/null
+++ b/utils/isolate.go
@@ -0,0 +1,60 @@
+//go:build !windows
+// +build !windows
+
+package utils
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "os/user"
+ "strconv"
+ "syscall"
+
+ "github.com/spiral/errors"
+)
+
+// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
+func IsolateProcess(cmd *exec.Cmd) {
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pgid: 0}
+}
+
+// ExecuteFromUser may work only if run RR under root user
+func ExecuteFromUser(cmd *exec.Cmd, u string) error {
+ const op = errors.Op("execute_from_user")
+ usr, err := user.Lookup(u)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ usrI32, err := strconv.ParseInt(usr.Uid, 10, 32)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ grI32, err := strconv.ParseInt(usr.Gid, 10, 32)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // For more information:
+ // https://www.man7.org/linux/man-pages/man7/user_namespaces.7.html
+ // https://www.man7.org/linux/man-pages/man7/namespaces.7.html
+ if _, err := os.Stat("/proc/self/ns/user"); err != nil {
+ if os.IsNotExist(err) {
+ return fmt.Errorf("kernel doesn't support user namespaces")
+ }
+ if os.IsPermission(err) {
+ return fmt.Errorf("unable to test user namespaces due to permissions")
+ }
+
+ return errors.E(op, errors.Errorf("failed to stat /proc/self/ns/user: %v", err))
+ }
+
+ cmd.SysProcAttr.Credential = &syscall.Credential{
+ Uid: uint32(usrI32),
+ Gid: uint32(grI32),
+ }
+
+ return nil
+}
diff --git a/utils/isolate_win.go b/utils/isolate_win.go
new file mode 100755
index 00000000..6b6d22e0
--- /dev/null
+++ b/utils/isolate_win.go
@@ -0,0 +1,18 @@
+//go:build windows
+// +build windows
+
+package utils
+
+import (
+ "os/exec"
+ "syscall"
+)
+
+// IsolateProcess change gpid for the process to avoid bypassing signals to php processes.
+func IsolateProcess(cmd *exec.Cmd) {
+ cmd.SysProcAttr = &syscall.SysProcAttr{CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP}
+}
+
+func ExecuteFromUser(cmd *exec.Cmd, u string) error {
+ return nil
+}
diff --git a/utils/network.go b/utils/network.go
new file mode 100755
index 00000000..d9269269
--- /dev/null
+++ b/utils/network.go
@@ -0,0 +1,108 @@
+//go:build linux || darwin || freebsd
+// +build linux darwin freebsd
+
+package utils
+
+import (
+ "fmt"
+ "net"
+ "os"
+ "strings"
+ "syscall"
+
+ "github.com/spiral/tcplisten"
+)
+
+const (
+ IPV4 string = "tcp4"
+ IPV6 string = "tcp6"
+)
+
+// CreateListener
+// - SO_REUSEPORT. This option allows linear scaling server performance
+// on multi-CPU servers.
+// See https://www.nginx.com/blog/socket-sharding-nginx-release-1-9-1/ for details.
+//
+// - TCP_DEFER_ACCEPT. This option expects the server reads from the accepted
+// connection before writing to them.
+//
+// - TCP_FASTOPEN. See https://lwn.net/Articles/508865/ for details.
+// CreateListener crates socket listener based on DSN definition.
+func CreateListener(address string) (net.Listener, error) {
+ dsn := strings.Split(address, "://")
+
+ switch len(dsn) {
+ case 1:
+ // assume, that there is no prefix here [127.0.0.1:8000]
+ return createTCPListener(dsn[0])
+ case 2:
+ // we got two part here, first part is the transport, second - address
+ // [tcp://127.0.0.1:8000] OR [unix:///path/to/unix.socket] OR [error://path]
+ // where error is wrong transport name
+ switch dsn[0] {
+ case "unix":
+ // check of file exist. If exist, unlink
+ if fileExists(dsn[1]) {
+ err := syscall.Unlink(dsn[1])
+ if err != nil {
+ return nil, fmt.Errorf("error during the unlink syscall: error %w", err)
+ }
+ }
+ return net.Listen(dsn[0], dsn[1])
+ case "tcp":
+ return createTCPListener(dsn[1])
+ // not an tcp or unix
+ default:
+ return nil, fmt.Errorf("invalid Protocol ([tcp://]:6001, unix://file.sock), address: %s", address)
+ }
+ // wrong number of split parts
+ default:
+ return nil, fmt.Errorf("wrong number of parsed protocol parts, address: %s", address)
+ }
+}
+
+func createTCPListener(addr string) (net.Listener, error) {
+ cfg := tcplisten.Config{
+ ReusePort: true,
+ DeferAccept: false,
+ FastOpen: true,
+ }
+
+ /*
+ Options we may have here:
+ 1. [::1]:8080 //ipv6
+ 2. [0:0:..]:8080 //ipv6
+ 3. 127.0.0.1:8080 //ipv4
+ 4. :8080 //ipv4
+ 5. [::]:8080 //ipv6
+ */
+ host, _, err := net.SplitHostPort(addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // consider this is IPv4
+ if host == "" {
+ return cfg.NewListener(IPV4, addr)
+ }
+
+ return cfg.NewListener(netw(net.ParseIP(host)), addr)
+}
+
+// check if we are listening on the ipv6 or ipv4 address
+func netw(addr net.IP) string {
+ if addr.To4() == nil {
+ return IPV6
+ }
+ return IPV4
+}
+
+// fileExists checks if a file exists and is not a directory before we
+// try using it to prevent further errors.
+func fileExists(filename string) bool {
+ info, err := os.Stat(filename)
+ if os.IsNotExist(err) {
+ return false
+ }
+ return !info.IsDir()
+}
diff --git a/utils/network_windows.go b/utils/network_windows.go
new file mode 100755
index 00000000..88e0fdb6
--- /dev/null
+++ b/utils/network_windows.go
@@ -0,0 +1,64 @@
+//go:build windows
+// +build windows
+
+package utils
+
+import (
+ "fmt"
+ "net"
+ "os"
+ "strings"
+ "syscall"
+)
+
+// CreateListener crates socket listener based on DSN definition.
+func CreateListener(address string) (net.Listener, error) {
+ dsn := strings.Split(address, "://")
+
+ switch len(dsn) {
+ case 1:
+ // assume, that there is no prefix here [127.0.0.1:8000]
+ return createTCPListener(dsn[0])
+ case 2:
+ // we got two part here, first part is the transport, second - address
+ // [tcp://127.0.0.1:8000] OR [unix:///path/to/unix.socket] OR [error://path]
+ // where error is wrong transport name
+ switch dsn[0] {
+ case "unix":
+ // check of file exist. If exist, unlink
+ if fileExists(dsn[1]) {
+ err := syscall.Unlink(dsn[1])
+ if err != nil {
+ return nil, fmt.Errorf("error during the unlink syscall: error %v", err)
+ }
+ }
+ return net.Listen(dsn[0], dsn[1])
+ case "tcp":
+ return createTCPListener(dsn[1])
+ // not an tcp or unix
+ default:
+ return nil, fmt.Errorf("invalid Protocol ([tcp://]:6001, unix://file.sock), address: %s", address)
+ }
+ // wrong number of split parts
+ default:
+ return nil, fmt.Errorf("wrong number of parsed protocol parts, address: %s", address)
+ }
+}
+
+func createTCPListener(addr string) (net.Listener, error) {
+ listener, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, err
+ }
+ return listener, nil
+}
+
+// fileExists checks if a file exists and is not a directory before we
+// try using it to prevent further errors.
+func fileExists(filename string) bool {
+ info, err := os.Stat(filename)
+ if os.IsNotExist(err) {
+ return false
+ }
+ return !info.IsDir()
+}
diff --git a/utils/race_checker.go b/utils/race_checker.go
new file mode 100644
index 00000000..cd5ed556
--- /dev/null
+++ b/utils/race_checker.go
@@ -0,0 +1,35 @@
+//go:build race
+
+package utils
+
+import (
+ "crypto/sha512"
+ "fmt"
+ "runtime"
+)
+
+func SetChecker(b []byte) {
+ if len(b) == 0 {
+ return
+ }
+ c := checkIfConst(b)
+ go c.isStillConst()
+ runtime.SetFinalizer(c, (*constSlice).isStillConst)
+}
+
+type constSlice struct {
+ b []byte
+ checksum [64]byte
+}
+
+func checkIfConst(b []byte) *constSlice {
+ c := &constSlice{b: b}
+ c.checksum = sha512.Sum512(c.b)
+ return c
+}
+
+func (c *constSlice) isStillConst() {
+ if sha512.Sum512(c.b) != c.checksum {
+ panic(fmt.Sprintf("mutable access detected 0x%012x", &c.b[0]))
+ }
+}
diff --git a/utils/race_checker_unsafe.go b/utils/race_checker_unsafe.go
new file mode 100644
index 00000000..d2b622a5
--- /dev/null
+++ b/utils/race_checker_unsafe.go
@@ -0,0 +1,5 @@
+//go:build !race
+
+package utils
+
+func SetChecker(_ []byte) {}
diff --git a/utils/to_ptr.go b/utils/to_ptr.go
new file mode 100644
index 00000000..7c93ef46
--- /dev/null
+++ b/utils/to_ptr.go
@@ -0,0 +1,467 @@
+package utils
+
+import "time"
+
+// Bool returns a pointer value for the bool value passed in.
+func Bool(v bool) *bool {
+ return &v
+}
+
+// BoolSlice returns a slice of bool pointers from the values
+// passed in.
+func BoolSlice(vs []bool) []*bool {
+ ps := make([]*bool, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// BoolMap returns a map of bool pointers from the values
+// passed in.
+func BoolMap(vs map[string]bool) map[string]*bool {
+ ps := make(map[string]*bool, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Byte returns a pointer value for the byte value passed in.
+func Byte(v byte) *byte {
+ return &v
+}
+
+// ByteSlice returns a slice of byte pointers from the values
+// passed in.
+func ByteSlice(vs []byte) []*byte {
+ ps := make([]*byte, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// ByteMap returns a map of byte pointers from the values
+// passed in.
+func ByteMap(vs map[string]byte) map[string]*byte {
+ ps := make(map[string]*byte, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// String returns a pointer value for the string value passed in.
+func String(v string) *string {
+ return &v
+}
+
+// StringSlice returns a slice of string pointers from the values
+// passed in.
+func StringSlice(vs []string) []*string {
+ ps := make([]*string, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// StringMap returns a map of string pointers from the values
+// passed in.
+func StringMap(vs map[string]string) map[string]*string {
+ ps := make(map[string]*string, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Int returns a pointer value for the int value passed in.
+func Int(v int) *int {
+ return &v
+}
+
+// IntSlice returns a slice of int pointers from the values
+// passed in.
+func IntSlice(vs []int) []*int {
+ ps := make([]*int, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// IntMap returns a map of int pointers from the values
+// passed in.
+func IntMap(vs map[string]int) map[string]*int {
+ ps := make(map[string]*int, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Int8 returns a pointer value for the int8 value passed in.
+func Int8(v int8) *int8 {
+ return &v
+}
+
+// Int8Slice returns a slice of int8 pointers from the values
+// passed in.
+func Int8Slice(vs []int8) []*int8 {
+ ps := make([]*int8, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Int8Map returns a map of int8 pointers from the values
+// passed in.
+func Int8Map(vs map[string]int8) map[string]*int8 {
+ ps := make(map[string]*int8, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Int16 returns a pointer value for the int16 value passed in.
+func Int16(v int16) *int16 {
+ return &v
+}
+
+// Int16Slice returns a slice of int16 pointers from the values
+// passed in.
+func Int16Slice(vs []int16) []*int16 {
+ ps := make([]*int16, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Int16Map returns a map of int16 pointers from the values
+// passed in.
+func Int16Map(vs map[string]int16) map[string]*int16 {
+ ps := make(map[string]*int16, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Int32 returns a pointer value for the int32 value passed in.
+func Int32(v int32) *int32 {
+ return &v
+}
+
+// Int32Slice returns a slice of int32 pointers from the values
+// passed in.
+func Int32Slice(vs []int32) []*int32 {
+ ps := make([]*int32, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Int32Map returns a map of int32 pointers from the values
+// passed in.
+func Int32Map(vs map[string]int32) map[string]*int32 {
+ ps := make(map[string]*int32, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Int64 returns a pointer value for the int64 value passed in.
+func Int64(v int64) *int64 {
+ return &v
+}
+
+// Int64Slice returns a slice of int64 pointers from the values
+// passed in.
+func Int64Slice(vs []int64) []*int64 {
+ ps := make([]*int64, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Int64Map returns a map of int64 pointers from the values
+// passed in.
+func Int64Map(vs map[string]int64) map[string]*int64 {
+ ps := make(map[string]*int64, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Uint returns a pointer value for the uint value passed in.
+func Uint(v uint) *uint {
+ return &v
+}
+
+// UintSlice returns a slice of uint pointers from the values
+// passed in.
+func UintSlice(vs []uint) []*uint {
+ ps := make([]*uint, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// UintMap returns a map of uint pointers from the values
+// passed in.
+func UintMap(vs map[string]uint) map[string]*uint {
+ ps := make(map[string]*uint, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Uint8 returns a pointer value for the uint8 value passed in.
+func Uint8(v uint8) *uint8 {
+ return &v
+}
+
+// Uint8Slice returns a slice of uint8 pointers from the values
+// passed in.
+func Uint8Slice(vs []uint8) []*uint8 {
+ ps := make([]*uint8, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Uint8Map returns a map of uint8 pointers from the values
+// passed in.
+func Uint8Map(vs map[string]uint8) map[string]*uint8 {
+ ps := make(map[string]*uint8, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Uint16 returns a pointer value for the uint16 value passed in.
+func Uint16(v uint16) *uint16 {
+ return &v
+}
+
+// Uint16Slice returns a slice of uint16 pointers from the values
+// passed in.
+func Uint16Slice(vs []uint16) []*uint16 {
+ ps := make([]*uint16, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Uint16Map returns a map of uint16 pointers from the values
+// passed in.
+func Uint16Map(vs map[string]uint16) map[string]*uint16 {
+ ps := make(map[string]*uint16, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Uint32 returns a pointer value for the uint32 value passed in.
+func Uint32(v uint32) *uint32 {
+ return &v
+}
+
+// Uint32Slice returns a slice of uint32 pointers from the values
+// passed in.
+func Uint32Slice(vs []uint32) []*uint32 {
+ ps := make([]*uint32, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Uint32Map returns a map of uint32 pointers from the values
+// passed in.
+func Uint32Map(vs map[string]uint32) map[string]*uint32 {
+ ps := make(map[string]*uint32, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Uint64 returns a pointer value for the uint64 value passed in.
+func Uint64(v uint64) *uint64 {
+ return &v
+}
+
+// Uint64Slice returns a slice of uint64 pointers from the values
+// passed in.
+func Uint64Slice(vs []uint64) []*uint64 {
+ ps := make([]*uint64, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Uint64Map returns a map of uint64 pointers from the values
+// passed in.
+func Uint64Map(vs map[string]uint64) map[string]*uint64 {
+ ps := make(map[string]*uint64, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Float32 returns a pointer value for the float32 value passed in.
+func Float32(v float32) *float32 {
+ return &v
+}
+
+// Float32Slice returns a slice of float32 pointers from the values
+// passed in.
+func Float32Slice(vs []float32) []*float32 {
+ ps := make([]*float32, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Float32Map returns a map of float32 pointers from the values
+// passed in.
+func Float32Map(vs map[string]float32) map[string]*float32 {
+ ps := make(map[string]*float32, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Float64 returns a pointer value for the float64 value passed in.
+func Float64(v float64) *float64 {
+ return &v
+}
+
+// Float64Slice returns a slice of float64 pointers from the values
+// passed in.
+func Float64Slice(vs []float64) []*float64 {
+ ps := make([]*float64, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// Float64Map returns a map of float64 pointers from the values
+// passed in.
+func Float64Map(vs map[string]float64) map[string]*float64 {
+ ps := make(map[string]*float64, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
+
+// Time returns a pointer value for the time.Time value passed in.
+func Time(v time.Time) *time.Time {
+ return &v
+}
+
+// TimeSlice returns a slice of time.Time pointers from the values
+// passed in.
+func TimeSlice(vs []time.Time) []*time.Time {
+ ps := make([]*time.Time, len(vs))
+ for i, v := range vs {
+ vv := v
+ ps[i] = &vv
+ }
+
+ return ps
+}
+
+// TimeMap returns a map of time.Time pointers from the values
+// passed in.
+func TimeMap(vs map[string]time.Time) map[string]*time.Time {
+ ps := make(map[string]*time.Time, len(vs))
+ for k, v := range vs {
+ vv := v
+ ps[k] = &vv
+ }
+
+ return ps
+}
diff --git a/worker/worker.go b/worker/worker.go
index b2689c59..8ca55a3b 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -10,26 +10,18 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/pkg/relay"
- "github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
+ "go.uber.org/zap"
)
type Options func(p *Process)
-const (
- workerEventsName string = "worker"
-)
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
-
- // updates parent supervisor or pool about Process events
- events events.EventBus
- eventsID string
+ log *zap.Logger
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -57,27 +49,39 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return nil, fmt.Errorf("can't attach to running process")
}
- eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: eb,
- eventsID: id,
- cmd: cmd,
- state: NewWorkerState(StateInactive),
- doneCh: make(chan struct{}, 1),
+ created: time.Now(),
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
+ doneCh: make(chan struct{}, 1),
}
- // set self as stderr implementation (Writer interface)
- w.cmd.Stderr = w
-
// add options
for i := 0; i < len(options); i++ {
options[i](w)
}
+ if w.log == nil {
+ z, err := zap.NewDevelopment()
+ if err != nil {
+ return nil, err
+ }
+
+ w.log = z
+ }
+
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
+
return w, nil
}
+func WithLog(z *zap.Logger) Options {
+ return func(p *Process) {
+ p.log = z
+ }
+}
+
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -137,7 +141,6 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
var err error
err = w.cmd.Wait()
- defer w.events.Unsubscribe(w.eventsID)
w.doneCh <- struct{}{}
// If worker was destroyed, just exit
@@ -182,7 +185,6 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
const op = errors.Op("process_stop")
- defer w.events.Unsubscribe(w.eventsID)
select {
// finished
@@ -213,7 +215,6 @@ func (w *Process) Kill() error {
return err
}
- w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -223,12 +224,12 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
- w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p)))
+ // unsafe to use utils.AsString
+ w.log.Info(string(p))
return len(p), nil
}
diff --git a/worker_watcher/worker_watcher.go b/worker_watcher/worker_watcher.go
index cfde9931..cfadb951 100755
--- a/worker_watcher/worker_watcher.go
+++ b/worker_watcher/worker_watcher.go
@@ -2,7 +2,6 @@ package worker_watcher //nolint:stylecheck
import (
"context"
- "fmt"
"sync"
"sync/atomic"
"time"
@@ -12,10 +11,7 @@ import (
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/spiral/roadrunner/v2/worker_watcher/container/channel"
-)
-
-const (
- wwName string = "worker_watcher"
+ "go.uber.org/zap"
)
// Vector interface represents vector container
@@ -39,22 +35,19 @@ type workerWatcher struct {
// used to control Destroy stage (that all workers are in the container)
numWorkers *uint64
- workers []worker.BaseProcess
- events events.EventBus
- eventsID string
+ workers []worker.BaseProcess
+ log *zap.Logger
allocator worker.Allocator
allocateTimeout time.Duration
}
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
- eb, id := events.Bus()
- ww := &workerWatcher{
+func NewSyncWorkerWatcher(allocator worker.Allocator, log *zap.Logger, numWorkers uint64, allocateTimeout time.Duration) *workerWatcher {
+ return &workerWatcher{
container: channel.NewVector(numWorkers),
- events: eb,
- eventsID: id,
+ log: log,
// pass a ptr to the number of workers to avoid blocking in the TTL loop
numWorkers: utils.Uint64(numWorkers),
allocateTimeout: allocateTimeout,
@@ -62,11 +55,11 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, allocat
allocator: allocator,
}
-
- return ww
}
func (ww *workerWatcher) Watch(workers []worker.BaseProcess) error {
+ ww.Lock()
+ defer ww.Unlock()
for i := 0; i < len(workers); i++ {
ww.container.Push(workers[i])
// add worker to watch slice
@@ -143,15 +136,14 @@ func (ww *workerWatcher) Allocate() error {
sw, err := ww.allocator()
if err != nil {
// log incident
- ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker: %v", err)))
// if no timeout, return error immediately
if ww.allocateTimeout == 0 {
return errors.E(op, errors.WorkerAllocate, err)
}
- // every half of a second
- allocateFreq := time.NewTicker(time.Millisecond * 500)
+ // every second
+ allocateFreq := time.NewTicker(time.Millisecond * 1000)
tt := time.After(ww.allocateTimeout)
for {
@@ -167,7 +159,7 @@ func (ww *workerWatcher) Allocate() error {
sw, err = ww.allocator()
if err != nil {
// log incident
- ww.events.Send(events.NewEvent(events.EventWorkerError, wwName, fmt.Sprintf("can't allocate the worker, retry attempt failed: %v", err)))
+ ww.log.Error("allocate retry attempt failed", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
continue
}
@@ -282,7 +274,6 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.container.Destroy()
ww.Unlock()
- ww.events.Unsubscribe(ww.eventsID)
tt := time.NewTicker(time.Millisecond * 10)
defer tt.Stop()
for {
@@ -343,7 +334,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
const op = errors.Op("worker_watcher_wait")
err := w.Wait()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerWaitExit, wwName, fmt.Sprintf("worker stopped, error: %v", err)))
+ ww.log.Debug("worker stopped", zap.String("internal_event_name", events.EventWorkerWaitExit.String()), zap.Error(err))
}
// remove worker
@@ -351,8 +342,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
- ww.events.Send(events.NewEvent(events.EventWorkerDestruct, wwName, fmt.Sprintf("worker destroyed, pid: %d", w.Pid())))
-
+ ww.log.Debug("worker destroyed", zap.Int64("pid", w.Pid()), zap.String("internal_event_name", events.EventWorkerDestruct.String()), zap.Error(err))
return
}
@@ -361,7 +351,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
err = ww.Allocate()
if err != nil {
- ww.events.Send(events.NewEvent(events.EventWorkerProcessExit, wwName, fmt.Sprintf("failed to allocate worker, error: %v", err)))
+ ww.log.Error("failed to allocate the worker", zap.String("internal_event_name", events.EventWorkerError.String()), zap.Error(err))
// no workers at all, panic
if len(ww.workers) == 0 && atomic.LoadUint64(ww.numWorkers) == 0 {