diff options
Diffstat (limited to 'ipc/pipe/pipe_factory.go')
-rwxr-xr-x | ipc/pipe/pipe_factory.go | 178 |
1 files changed, 0 insertions, 178 deletions
diff --git a/ipc/pipe/pipe_factory.go b/ipc/pipe/pipe_factory.go deleted file mode 100755 index 4a3c9a67..00000000 --- a/ipc/pipe/pipe_factory.go +++ /dev/null @@ -1,178 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - - "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/worker" - "go.uber.org/zap" -) - -// Factory connects to stack using standard -// streams (STDIN, STDOUT pipes). -type Factory struct { - log *zap.Logger -} - -// NewPipeFactory returns new factory instance and starts -// listening -func NewPipeFactory(log *zap.Logger) *Factory { - return &Factory{ - log: log, - } -} - -type sr struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, -// method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { - spCh := make(chan sr) - go func() { - w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - in, err := cmd.StdoutPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - out, err := cmd.StdinPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - return - } - } - - // used as a ping - _, err = internal.Pid(relay) - if err != nil { - _ = w.Kill() - select { - case spCh <- sr{ - w: nil, - err: err, - }: - return - default: - _ = w.Kill() - return - } - } - - select { - case - // return worker - spCh <- sr{ - w: w, - err: nil, - }: - // everything ok, set ready state - w.State().Set(worker.StateReady) - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-spCh: - if res.err != nil { - return nil, res.err - } - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { - w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) - if err != nil { - return nil, err - } - - in, err := cmd.StdoutPipe() - if err != nil { - return nil, err - } - - out, err := cmd.StdinPipe() - if err != nil { - return nil, err - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - return nil, err - } - - // errors bundle - _, err = internal.Pid(relay) - if err != nil { - _ = w.Kill() - return nil, err - } - - // everything ok, set ready state - w.State().Set(worker.StateReady) - return w, nil -} - -// Close the factory. -func (f *Factory) Close() error { - return nil -} |