diff options
Diffstat (limited to 'pipe_factory.go')
-rwxr-xr-x[-rw-r--r--] | pipe_factory.go | 175 |
1 files changed, 129 insertions, 46 deletions
diff --git a/pipe_factory.go b/pipe_factory.go index 9696a474..db00c989 100644..100755 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -1,78 +1,161 @@ package roadrunner import ( - "fmt" - "github.com/pkg/errors" - "github.com/spiral/goridge/v2" - "io" + "context" "os/exec" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3" + "go.uber.org/multierr" ) -// PipeFactory connects to workers using standard +// PipeFactory connects to stack using standard // streams (STDIN, STDOUT pipes). -type PipeFactory struct { -} +type PipeFactory struct{} // NewPipeFactory returns new factory instance and starts // listening -func NewPipeFactory() *PipeFactory { + +// todo: review tests +func NewPipeFactory() Factory { return &PipeFactory{} } -// SpawnWorker creates new worker and connects it to goridge relay, +type SpawnResult struct { + w WorkerBase + err error +} + +// SpawnWorker creates new WorkerProcess and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { - if w, err = newWorker(cmd); err != nil { - return nil, err - } +func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { + c := make(chan SpawnResult) + const op = errors.Op("spawn worker with context") + go func() { + w, err := InitBaseWorker(cmd) + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } - var ( - in io.ReadCloser - out io.WriteCloser - ) + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // Init new PIPE relay + relay := goridge.NewPipeRelay(in, out) + w.AttachRelay(relay) - if in, err = cmd.StdoutPipe(); err != nil { - return nil, err + // Start the worker + err = w.Start() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // errors bundle + pid, err := fetchPID(relay) + if pid != w.Pid() || err != nil { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + c <- SpawnResult{ + w: nil, + err: errors.E(op, err), + } + return + } + + // everything ok, set ready state + w.State().Set(StateReady) + + // return worker + c <- SpawnResult{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + return res.w, nil } +} - if out, err = cmd.StdinPipe(); err != nil { - return nil, err +func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { + const op = errors.Op("spawn worker") + w, err := InitBaseWorker(cmd) + if err != nil { + return nil, errors.E(op, err) } - w.rl = goridge.NewPipeRelay(in, out) + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.E(op, err) + } - if err := w.start(); err != nil { - return nil, errors.Wrap(err, "process error") + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + return nil, errors.E(op, err) } - if pid, err := fetchPID(w.rl); pid != *w.Pid { - go func(w *Worker) { - err := w.Kill() - if err != nil { - // there is no logger here, how to handle error in goroutines ? - fmt.Println(fmt.Sprintf("error killing the worker with PID number %d, Created: %s", w.Pid, w.Created)) - } - }(w) - - if wErr := w.Wait(); wErr != nil { - if _, ok := wErr.(*exec.ExitError); ok { - // error might be nil here - if err != nil { - err = errors.Wrap(wErr, err.Error()) - } - } else { - err = wErr - } - } + // Init new PIPE relay + relay := goridge.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, errors.E(op, err) + } - return nil, errors.Wrap(err, "unable to connect to worker") + // errors bundle + if pid, err := fetchPID(relay); pid != w.Pid() { + err = multierr.Combine( + err, + w.Kill(), + w.Wait(), + ) + return nil, errors.E(op, err) } - w.state.set(StateReady) + // everything ok, set ready state + w.State().Set(StateReady) return w, nil } // Close the factory. -func (f *PipeFactory) Close() error { +func (f *PipeFactory) Close(ctx context.Context) error { return nil } |