diff options
author | Valery Piashchynski <[email protected]> | 2021-08-03 13:36:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-03 13:36:31 +0300 |
commit | 606e2170ccac5a13a11198aaf54e4219a83291ab (patch) | |
tree | 6eeb30453e7a1582f339e78772d639f00115221c /pkg/transport/socket/socket_factory.go | |
parent | 31752d8bd20294c7d52cd3612fbf18e44ce42637 (diff) |
In a rare cases, when user set small timeout to allocate a worker,
spawned goroutine might stuck on the channel send operation and leak
memory.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport/socket/socket_factory.go')
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 965a0f30..dc2b75cf 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -2,6 +2,7 @@ package socket import ( "context" + "fmt" "net" "os/exec" "sync" @@ -29,8 +30,6 @@ type Factory struct { // sockets which are waiting for process association relays sync.Map - - ErrCh chan error } // NewSocketServer returns Factory attached to a given socket listener. @@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { ls: ls, tout: tout, relays: sync.Map{}, - ErrCh: make(chan error, 10), } // Be careful // https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go // https://github.com/golang/go/issues/5045 go func() { - f.ErrCh <- f.listen() + err := f.listen() + // there is no logger here, use fmt + if err != nil { + fmt.Printf("[WARN]: socket server listen, error: %v\n", err) + } }() return f @@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis defer cancel() w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, - err: err, + err: errors.E(op, err), + }: + return + default: + return } - return } err = w.Start() if err != nil { - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } rl, err := f.findRelayWithContext(ctxT, w) @@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis w.Wait(), ) - c <- socketSpawn{ + select { + // try to write result + case c <- socketSpawn{ w: nil, err: errors.E(op, err), + }: + return + // if no receivers - return + default: + return } - return } w.AttachRelay(rl) w.State().Set(worker.StateReady) - c <- socketSpawn{ + select { + case c <- socketSpawn{ w: w, err: nil, + }: + return + default: + _ = w.Kill() + return } }() @@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) + + // errors bundle + if pid, err := internal.FetchPID(rl); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) + } + w.State().Set(worker.StateReady) return w, nil |