summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-03 13:36:31 +0300
committerValery Piashchynski <[email protected]>2021-08-03 13:36:31 +0300
commit606e2170ccac5a13a11198aaf54e4219a83291ab (patch)
tree6eeb30453e7a1582f339e78772d639f00115221c
parent31752d8bd20294c7d52cd3612fbf18e44ce42637 (diff)
In a rare cases, when user set small timeout to allocate a worker,
spawned goroutine might stuck on the channel send operation and leak memory. Signed-off-by: Valery Piashchynski <[email protected]>
-rwxr-xr-xpkg/pool/static_pool.go16
-rwxr-xr-xpkg/pool/supervisor_pool.go2
-rw-r--r--pkg/transport/interface.go2
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go77
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go1
-rwxr-xr-xpkg/transport/socket/socket_factory.go57
6 files changed, 118 insertions, 37 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 5990f929..1cd0a8fa 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -289,6 +289,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return nil, err
}
+ // wrap sync worker
sw := worker.From(w)
sp.events.Push(events.PoolEvent{
@@ -301,18 +302,25 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
// execDebug used when debug mode was not set and exec_ttl is 0
func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) {
+ const op = errors.Op("static_pool_exec_debug")
sw, err := sp.allocator()
if err != nil {
return nil, err
}
- // redirect call to the workers exec method (without ttl)
+ // redirect call to the workers' exec method (without ttl)
r, err := sw.Exec(p)
- if stopErr := sw.Stop(); stopErr != nil {
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ err = sw.Stop()
+ if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
+ return nil, errors.E(op, err)
}
- return r, err
+ return r, nil
}
// execDebugWithTTL used when user set debug mode and exec_ttl
@@ -333,7 +341,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload)
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
- const op = errors.Op("allocate workers")
+ const op = errors.Op("static_pool_allocate_workers")
workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index cbb7ad7b..bdaeade1 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
- // just to double check
+ // just to double-check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go
index 7e3e5350..1b072378 100644
--- a/pkg/transport/interface.go
+++ b/pkg/transport/interface.go
@@ -8,7 +8,7 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
-// Factory is responsible of wrapping given command into tasks WorkerProcess.
+// Factory is responsible for wrapping given command into tasks WorkerProcess.
type Factory interface {
// SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index 19f4f92d..9433a510 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -22,42 +22,54 @@ func NewPipeFactory() *Factory {
return &Factory{}
}
-type SpawnResult struct {
+type sr struct {
w *worker.Process
err error
}
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- c := make(chan SpawnResult)
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+ spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
in, err := cmd.StdoutPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
out, err := cmd.StdinPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
// Init new PIPE relay
@@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
// Start the worker
err = w.Start()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
- // errors bundle
pid, err := internal.FetchPID(relay)
- if pid != w.Pid() || err != nil {
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
w.Wait(),
)
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
- return
}
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
+ if pid != w.Pid() {
+ select {
+ case spCh <- sr{
+ w: nil,
+ err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
+ }
+ }
+ select {
+ case
// return worker
- c <- SpawnResult{
+ spCh <- sr{
w: w,
err: nil,
+ }:
+ // everything ok, set ready state
+ w.State().Set(worker.StateReady)
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
- case res := <-c:
+ case res := <-spCh:
if res.err != nil {
return nil, res.err
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 5c937a97..d243a93f 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -102,6 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) {
func Test_Pipe_PipeError2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes")
+ // error cause
_, err := cmd.StdinPipe()
if err != nil {
t.Errorf("error creating the STDIN pipe: error %v", err)
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 965a0f30..dc2b75cf 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -2,6 +2,7 @@ package socket
import (
"context"
+ "fmt"
"net"
"os/exec"
"sync"
@@ -29,8 +30,6 @@ type Factory struct {
// sockets which are waiting for process association
relays sync.Map
-
- ErrCh chan error
}
// NewSocketServer returns Factory attached to a given socket listener.
@@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
ls: ls,
tout: tout,
relays: sync.Map{},
- ErrCh: make(chan error, 10),
}
// Be careful
// https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
// https://github.com/golang/go/issues/5045
go func() {
- f.ErrCh <- f.listen()
+ err := f.listen()
+ // there is no logger here, use fmt
+ if err != nil {
+ fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
+ }
}()
return f
@@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
- err: err,
+ err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
err = w.Start()
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
rl, err := f.findRelayWithContext(ctxT, w)
@@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
w.Wait(),
)
- c <- socketSpawn{
+ select {
+ // try to write result
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ // if no receivers - return
+ default:
+ return
}
- return
}
w.AttachRelay(rl)
w.State().Set(worker.StateReady)
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: w,
err: nil,
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
@@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
+
+ // errors bundle
+ if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
+ }
+
w.State().Set(worker.StateReady)
return w, nil