summaryrefslogtreecommitdiff
path: root/pipe_factory.go
diff options
context:
space:
mode:
Diffstat (limited to 'pipe_factory.go')
-rwxr-xr-x[-rw-r--r--]pipe_factory.go175
1 files changed, 129 insertions, 46 deletions
diff --git a/pipe_factory.go b/pipe_factory.go
index 9696a474..db00c989 100644..100755
--- a/pipe_factory.go
+++ b/pipe_factory.go
@@ -1,78 +1,161 @@
package roadrunner
import (
- "fmt"
- "github.com/pkg/errors"
- "github.com/spiral/goridge/v2"
- "io"
+ "context"
"os/exec"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/goridge/v3"
+ "go.uber.org/multierr"
)
-// PipeFactory connects to workers using standard
+// PipeFactory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type PipeFactory struct {
-}
+type PipeFactory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
-func NewPipeFactory() *PipeFactory {
+
+// todo: review tests
+func NewPipeFactory() Factory {
return &PipeFactory{}
}
-// SpawnWorker creates new worker and connects it to goridge relay,
+type SpawnResult struct {
+ w WorkerBase
+ err error
+}
+
+// SpawnWorker creates new WorkerProcess and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) {
- if w, err = newWorker(cmd); err != nil {
- return nil, err
- }
+func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error) {
+ c := make(chan SpawnResult)
+ const op = errors.Op("spawn worker with context")
+ go func() {
+ w, err := InitBaseWorker(cmd)
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
- var (
- in io.ReadCloser
- out io.WriteCloser
- )
+ // TODO why out is in?
+ in, err := cmd.StdoutPipe()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // TODO why in is out?
+ out, err := cmd.StdinPipe()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // Init new PIPE relay
+ relay := goridge.NewPipeRelay(in, out)
+ w.AttachRelay(relay)
- if in, err = cmd.StdoutPipe(); err != nil {
- return nil, err
+ // Start the worker
+ err = w.Start()
+ if err != nil {
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // errors bundle
+ pid, err := fetchPID(relay)
+ if pid != w.Pid() || err != nil {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ c <- SpawnResult{
+ w: nil,
+ err: errors.E(op, err),
+ }
+ return
+ }
+
+ // everything ok, set ready state
+ w.State().Set(StateReady)
+
+ // return worker
+ c <- SpawnResult{
+ w: w,
+ err: nil,
+ }
+ }()
+
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case res := <-c:
+ if res.err != nil {
+ return nil, res.err
+ }
+ return res.w, nil
}
+}
- if out, err = cmd.StdinPipe(); err != nil {
- return nil, err
+func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error) {
+ const op = errors.Op("spawn worker")
+ w, err := InitBaseWorker(cmd)
+ if err != nil {
+ return nil, errors.E(op, err)
}
- w.rl = goridge.NewPipeRelay(in, out)
+ // TODO why out is in?
+ in, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
- if err := w.start(); err != nil {
- return nil, errors.Wrap(err, "process error")
+ // TODO why in is out?
+ out, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, errors.E(op, err)
}
- if pid, err := fetchPID(w.rl); pid != *w.Pid {
- go func(w *Worker) {
- err := w.Kill()
- if err != nil {
- // there is no logger here, how to handle error in goroutines ?
- fmt.Println(fmt.Sprintf("error killing the worker with PID number %d, Created: %s", w.Pid, w.Created))
- }
- }(w)
-
- if wErr := w.Wait(); wErr != nil {
- if _, ok := wErr.(*exec.ExitError); ok {
- // error might be nil here
- if err != nil {
- err = errors.Wrap(wErr, err.Error())
- }
- } else {
- err = wErr
- }
- }
+ // Init new PIPE relay
+ relay := goridge.NewPipeRelay(in, out)
+ w.AttachRelay(relay)
+
+ // Start the worker
+ err = w.Start()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
- return nil, errors.Wrap(err, "unable to connect to worker")
+ // errors bundle
+ if pid, err := fetchPID(relay); pid != w.Pid() {
+ err = multierr.Combine(
+ err,
+ w.Kill(),
+ w.Wait(),
+ )
+ return nil, errors.E(op, err)
}
- w.state.set(StateReady)
+ // everything ok, set ready state
+ w.State().Set(StateReady)
return w, nil
}
// Close the factory.
-func (f *PipeFactory) Close() error {
+func (f *PipeFactory) Close(ctx context.Context) error {
return nil
}