diff options
author | Valery Piashchynski <[email protected]> | 2021-08-03 13:36:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-03 13:36:31 +0300 |
commit | 606e2170ccac5a13a11198aaf54e4219a83291ab (patch) | |
tree | 6eeb30453e7a1582f339e78772d639f00115221c /pkg | |
parent | 31752d8bd20294c7d52cd3612fbf18e44ce42637 (diff) |
In a rare cases, when user set small timeout to allocate a worker,
spawned goroutine might stuck on the channel send operation and leak
memory.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 16 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 2 | ||||
-rw-r--r-- | pkg/transport/interface.go | 2 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 77 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 1 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 57 |
6 files changed, 118 insertions, 37 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 5990f929..1cd0a8fa 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -289,6 +289,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return nil, err } + // wrap sync worker sw := worker.From(w) sp.events.Push(events.PoolEvent{ @@ -301,18 +302,25 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio // execDebug used when debug mode was not set and exec_ttl is 0 func (sp *StaticPool) execDebug(p *payload.Payload) (*payload.Payload, error) { + const op = errors.Op("static_pool_exec_debug") sw, err := sp.allocator() if err != nil { return nil, err } - // redirect call to the workers exec method (without ttl) + // redirect call to the workers' exec method (without ttl) r, err := sw.Exec(p) - if stopErr := sw.Stop(); stopErr != nil { + if err != nil { + return nil, errors.E(op, err) + } + + err = sw.Stop() + if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) + return nil, errors.E(op, err) } - return r, err + return r, nil } // execDebugWithTTL used when user set debug mode and exec_ttl @@ -333,7 +341,7 @@ func (sp *StaticPool) execDebugWithTTL(ctx context.Context, p *payload.Payload) // allocate required number of stack func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) { - const op = errors.Op("allocate workers") + const op = errors.Op("static_pool_allocate_workers") workers := make([]worker.BaseProcess, 0, numWorkers) // constant number of stack simplify logic diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index cbb7ad7b..bdaeade1 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -221,7 +221,7 @@ func (sp *supervised) control() { //nolint:gocognit workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - // just to double check + // just to double-check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/transport/interface.go b/pkg/transport/interface.go index 7e3e5350..1b072378 100644 --- a/pkg/transport/interface.go +++ b/pkg/transport/interface.go @@ -8,7 +8,7 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker" ) -// Factory is responsible of wrapping given command into tasks WorkerProcess. +// Factory is responsible for wrapping given command into tasks WorkerProcess. type Factory interface { // SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context. // Process must not be started. diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 19f4f92d..9433a510 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -22,42 +22,54 @@ func NewPipeFactory() *Factory { return &Factory{} } -type SpawnResult struct { +type sr struct { w *worker.Process err error } // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - c := make(chan SpawnResult) +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } in, err := cmd.StdoutPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } out, err := cmd.StdinPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } // Init new PIPE relay @@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis // Start the worker err = w.Start() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } - // errors bundle pid, err := internal.FetchPID(relay) - if pid != w.Pid() || err != nil { + if err != nil { err = multierr.Combine( err, w.Kill(), w.Wait(), ) - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return } - return } - // everything ok, set ready state - w.State().Set(worker.StateReady) + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + select { + case // return worker - c <- SpawnResult{ + spCh <- sr{ w: w, err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return } }() select { case <-ctx.Done(): return nil, ctx.Err() - case res := <-c: + case res := <-spCh: if res.err != nil { return nil, res.err } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 5c937a97..d243a93f 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -102,6 +102,7 @@ func Test_Pipe_PipeError(t *testing.T) { func Test_Pipe_PipeError2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "echo", "pipes") + // error cause _, err := cmd.StdinPipe() if err != nil { t.Errorf("error creating the STDIN pipe: error %v", err) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 965a0f30..dc2b75cf 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -2,6 +2,7 @@ package socket import ( "context" + "fmt" "net" "os/exec" "sync" @@ -29,8 +30,6 @@ type Factory struct { // sockets which are waiting for process association relays sync.Map - - ErrCh chan error } // NewSocketServer returns Factory attached to a given socket listener. @@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { ls: ls, tout: tout, relays: sync.Map{}, - ErrCh: make(chan error, 10), } // Be careful // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go // https://github.com/golang/go/issues/5045 go func() { - f.ErrCh <- f.listen() + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } }() return f @@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, - err: err, + err: errors.E(op, err), + }: + return + default: + return } - return } err = w.Start() if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } rl, err := f.findRelayWithContext(ctxT, w) @@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis w.Wait(), ) - c <- socketSpawn{ + select { + // try to write result + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + // if no receivers - return + default: + return } - return } w.AttachRelay(rl) w.State().Set(worker.StateReady) - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: w, err: nil, + }: + return + default: + _ = w.Kill() + return } }() @@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) + + // errors bundle + if pid, err := internal.FetchPID(rl); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + w.State().Set(worker.StateReady) return w, nil |