summaryrefslogtreecommitdiff
path: root/pkg/transport/pipe/pipe_factory.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/transport/pipe/pipe_factory.go')
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go77
1 files changed, 58 insertions, 19 deletions
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index 19f4f92d..9433a510 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -22,42 +22,54 @@ func NewPipeFactory() *Factory {
return &Factory{}
}
-type SpawnResult struct {
+type sr struct {
w *worker.Process
err error
}
// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- c := make(chan SpawnResult)
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
+ spCh := make(chan sr)
const op = errors.Op("factory_spawn_worker_with_timeout")
go func() {
w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
in, err := cmd.StdoutPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
out, err := cmd.StdinPipe()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
// Init new PIPE relay
@@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
// Start the worker
err = w.Start()
if err != nil {
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ return
}
- return
}
- // errors bundle
pid, err := internal.FetchPID(relay)
- if pid != w.Pid() || err != nil {
+ if err != nil {
err = multierr.Combine(
err,
w.Kill(),
w.Wait(),
)
- c <- SpawnResult{
+ select {
+ case spCh <- sr{
w: nil,
err: errors.E(op, err),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
}
- return
}
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
+ if pid != w.Pid() {
+ select {
+ case spCh <- sr{
+ w: nil,
+ err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
+ }:
+ return
+ default:
+ _ = w.Kill()
+ return
+ }
+ }
+ select {
+ case
// return worker
- c <- SpawnResult{
+ spCh <- sr{
w: w,
err: nil,
+ }:
+ // everything ok, set ready state
+ w.State().Set(worker.StateReady)
+ return
+ default:
+ _ = w.Kill()
+ return
}
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
- case res := <-c:
+ case res := <-spCh:
if res.err != nil {
return nil, res.err
}