diff options
author | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
commit | 0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch) | |
tree | ffcb65010bebe9f5b5436192979e64b2402a6ec0 /pipe_factory.go | |
parent | 08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff) |
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'pipe_factory.go')
-rw-r--r-- | pipe_factory.go | 182 |
1 files changed, 147 insertions, 35 deletions
diff --git a/pipe_factory.go b/pipe_factory.go index 9696a474..a6c94614 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -1,78 +1,190 @@ package roadrunner import ( + "context" "fmt" + "os/exec" + "strings" + "github.com/pkg/errors" "github.com/spiral/goridge/v2" - "io" - "os/exec" ) -// PipeFactory connects to workers using standard +// PipeFactory connects to stack using standard // streams (STDIN, STDOUT pipes). type PipeFactory struct { } // NewPipeFactory returns new factory instance and starts // listening + +// todo: review tests func NewPipeFactory() *PipeFactory { return &PipeFactory{} } -// SpawnWorker creates new worker and connects it to goridge relay, +type SpawnResult struct { + w WorkerBase + err error +} + +// SpawnWorker creates new WorkerProcess and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { - if w, err = newWorker(cmd); err != nil { - return nil, err +func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) { + c := make(chan SpawnResult) + go func() { + w, err := InitBaseWorker(cmd) + if err != nil { + c <- SpawnResult{ + w: nil, + err: err, + } + return + } + + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: err, + } + return + } + + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { + c <- SpawnResult{ + w: nil, + err: err, + } + return + } + + // Init new PIPE relay + relay := goridge.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + c <- SpawnResult{ + w: nil, + err: errors.Wrap(err, "process error"), + } + return + } + + // errors bundle + var errs []string + if pid, errF := fetchPID(relay); pid != w.Pid() { + if errF != nil { + errs = append(errs, errF.Error()) + } + + // todo kill timeout + errK := w.Kill(ctx) + if errK != nil { + errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error()) + } + + if wErr := w.Wait(ctx); wErr != nil { + errs = append(errs, wErr.Error()) + } + + if len(errs) > 0 { + c <- SpawnResult{ + w: nil, + err: errors.New(strings.Join(errs, " : ")), + } + } else { + c <- SpawnResult{ + w: nil, + err: nil, + } + } + + return + } + + // everything ok, set ready state + w.State().Set(StateReady) + + // return worker + c <- SpawnResult{ + w: w, + err: nil, + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-c: + if res.err != nil { + return nil, res.err + } + return res.w, nil } +} - var ( - in io.ReadCloser - out io.WriteCloser - ) +func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) { + w, err := InitBaseWorker(cmd) + if err != nil { + return nil, err + } - if in, err = cmd.StdoutPipe(); err != nil { + // TODO why out is in? + in, err := cmd.StdoutPipe() + if err != nil { return nil, err } - if out, err = cmd.StdinPipe(); err != nil { + // TODO why in is out? + out, err := cmd.StdinPipe() + if err != nil { return nil, err } - w.rl = goridge.NewPipeRelay(in, out) + // Init new PIPE relay + relay := goridge.NewPipeRelay(in, out) + w.AttachRelay(relay) - if err := w.start(); err != nil { + // Start the worker + err = w.Start() + if err != nil { return nil, errors.Wrap(err, "process error") } - if pid, err := fetchPID(w.rl); pid != *w.Pid { - go func(w *Worker) { - err := w.Kill() - if err != nil { - // there is no logger here, how to handle error in goroutines ? - fmt.Println(fmt.Sprintf("error killing the worker with PID number %d, Created: %s", w.Pid, w.Created)) - } - }(w) + // errors bundle + var errs []string + if pid, errF := fetchPID(relay); pid != w.Pid() { + if errF != nil { + errs = append(errs, errF.Error()) + } - if wErr := w.Wait(); wErr != nil { - if _, ok := wErr.(*exec.ExitError); ok { - // error might be nil here - if err != nil { - err = errors.Wrap(wErr, err.Error()) - } - } else { - err = wErr - } + // todo kill timeout ?? + errK := w.Kill(context.Background()) + if errK != nil { + errs = append(errs, fmt.Errorf("error killing the worker with PID number %d, Created: %s", w.Pid(), w.Created()).Error()) } - return nil, errors.Wrap(err, "unable to connect to worker") + if wErr := w.Wait(context.Background()); wErr != nil { + errs = append(errs, wErr.Error()) + } + + if len(errs) > 0 { + return nil, errors.New(strings.Join(errs, "/")) + } } - w.state.set(StateReady) + // everything ok, set ready state + w.State().Set(StateReady) return w, nil } // Close the factory. -func (f *PipeFactory) Close() error { +func (f *PipeFactory) Close(ctx context.Context) error { return nil } |