summaryrefslogtreecommitdiff
path: root/pkg/transport/socket/socket_factory.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-03 13:36:31 +0300
committerValery Piashchynski <[email protected]>2021-08-03 13:36:31 +0300
commit606e2170ccac5a13a11198aaf54e4219a83291ab (patch)
tree6eeb30453e7a1582f339e78772d639f00115221c /pkg/transport/socket/socket_factory.go
parent31752d8bd20294c7d52cd3612fbf18e44ce42637 (diff)
In a rare cases, when user set small timeout to allocate a worker,
spawned goroutine might stuck on the channel send operation and leak memory. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport/socket/socket_factory.go')
-rwxr-xr-xpkg/transport/socket/socket_factory.go57
1 files changed, 45 insertions, 12 deletions
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 965a0f30..dc2b75cf 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -2,6 +2,7 @@ package socket
import (
"context"
+ "fmt"
"net"
"os/exec"
"sync"
@@ -29,8 +30,6 @@ type Factory struct {
// sockets which are waiting for process association
relays sync.Map
-
- ErrCh chan error
}
// NewSocketServer returns Factory attached to a given socket listener.
@@ -40,14 +39,17 @@ func NewSocketServer(ls net.Listener, tout time.Duration) *Factory {
ls: ls,
tout: tout,
relays: sync.Map{},
- ErrCh: make(chan error, 10),
}
// Be careful
// https://github.com/go101/go101/wiki/About-memory-ordering-guarantees-made-by-atomic-operations-in-Go
// https://github.com/golang/go/issues/5045
go func() {
- f.ErrCh <- f.listen()
+ err := f.listen()
+ // there is no logger here, use fmt
+ if err != nil {
+ fmt.Printf("[WARN]: socket server listen, error: %v\n", err)
+ }
}()
return f
@@ -90,20 +92,28 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
defer cancel()
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
- err: err,
+ err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
err = w.Start()
if err != nil {
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
rl, err := f.findRelayWithContext(ctxT, w)
@@ -114,19 +124,31 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
w.Wait(),
)
- c <- socketSpawn{
+ select {
+ // try to write result
+ case c <- socketSpawn{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ // if no receivers - return
+ default:
+ return
}
- return
}
w.AttachRelay(rl)
w.State().Set(worker.StateReady)
- c <- socketSpawn{
+ select {
+ case c <- socketSpawn{
w: w,
err: nil,
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
@@ -165,6 +187,17 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
+
+ // errors bundle
+ if pid, err := internal.FetchPID(rl); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
+ }
+
w.State().Set(worker.StateReady)
return w, nil