summaryrefslogtreecommitdiff
path: root/pipe_factory.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
committerWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
commit78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch)
tree8882b9a051bcc9c42328df583c0bb8c39a89591e /pipe_factory.go
parentfa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff)
API update
Diffstat (limited to 'pipe_factory.go')
-rw-r--r--pipe_factory.go48
1 files changed, 31 insertions, 17 deletions
diff --git a/pipe_factory.go b/pipe_factory.go
index ce32dacc..30c34139 100644
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -2,45 +2,59 @@ package roadrunner
import (
"github.com/spiral/goridge"
+ "io"
"os/exec"
+ "github.com/pkg/errors"
)
-// PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).
+// PipeFactory connects to workers using standard
+// streams (STDIN, STDOUT pipes).
type PipeFactory struct {
}
-// NewPipeFactory returns new factory instance and starts listening
+// NewPipeFactory returns new factory instance and starts
+// listening
func NewPipeFactory() *PipeFactory {
return &PipeFactory{}
}
-// NewWorker creates worker and connects it to appropriate relay or returns error
-func (f *PipeFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = NewWorker(cmd)
- if err != nil {
+// SpawnWorker creates new worker and connects it to goridge relay,
+// method Wait() must be handled on level above.
+func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
+ if w, err = newWorker(cmd); err != nil {
return nil, err
}
- in, err := cmd.StdoutPipe()
- if err != nil {
+ var (
+ in io.ReadCloser
+ out io.WriteCloser
+ )
+
+ if in, err = cmd.StdoutPipe(); err != nil {
return nil, err
}
- out, err := cmd.StdinPipe()
- if err != nil {
+ if out, err = cmd.StdinPipe(); err != nil {
return nil, err
}
+ w.rl = goridge.NewPipeRelay(in, out)
+
if err := w.Start(); err != nil {
- return nil, err
+ return nil, errors.Wrap(err, "process error")
}
- w.attach(goridge.NewPipeRelay(in, out))
+ // todo: timeout ?
+ if pid, err := fetchPID(w.rl); pid != *w.Pid {
+ go func(w *Worker) { w.Kill() }(w)
- return w, nil
-}
+ if wErr := w.Wait(); wErr != nil {
+ err = errors.Wrap(wErr, err.Error())
+ }
+
+ return nil, errors.Wrap(err, "unable to connect to worker")
+ }
-// Close closes all open factory descriptors.
-func (f *PipeFactory) Close() error {
- return nil
+ w.state.set(StateReady)
+ return w, nil
}