diff options
Diffstat (limited to 'pkg/transport/pipe/pipe_factory.go')
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 77 |
1 files changed, 58 insertions, 19 deletions
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 19f4f92d..9433a510 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -22,42 +22,54 @@ func NewPipeFactory() *Factory { return &Factory{} } -type SpawnResult struct { +type sr struct { w *worker.Process err error } // SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - c := make(chan SpawnResult) +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit + spCh := make(chan sr) const op = errors.Op("factory_spawn_worker_with_timeout") go func() { w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } in, err := cmd.StdoutPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } out, err := cmd.StdinPipe() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } // Init new PIPE relay @@ -67,42 +79,69 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis // Start the worker err = w.Start() if err != nil { - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + return } - return } - // errors bundle pid, err := internal.FetchPID(relay) - if pid != w.Pid() || err != nil { + if err != nil { err = multierr.Combine( err, w.Kill(), w.Wait(), ) - c <- SpawnResult{ + select { + case spCh <- sr{ w: nil, err: errors.E(op, err), + }: + return + default: + _ = w.Kill() + return } - return } - // everything ok, set ready state - w.State().Set(worker.StateReady) + if pid != w.Pid() { + select { + case spCh <- sr{ + w: nil, + err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), + }: + return + default: + _ = w.Kill() + return + } + } + select { + case // return worker - c <- SpawnResult{ + spCh <- sr{ w: w, err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return } }() select { case <-ctx.Done(): return nil, ctx.Err() - case res := <-c: + case res := <-spCh: if res.err != nil { return nil, res.err } |