diff options
author | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-09-16 17:12:37 +0300 |
commit | f3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch) | |
tree | 32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /pkg/transport/pipe/pipe_factory.go | |
parent | 5d2cd55ab522d4f1e65a833f91146444465a32ac (diff) |
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport/pipe/pipe_factory.go')
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go deleted file mode 100755 index 9433a510..00000000 --- a/pkg/transport/pipe/pipe_factory.go +++ /dev/null @@ -1,197 +0,0 @@ -package pipe - -import ( - "context" - "os/exec" - - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/pipe" - "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/worker" - "go.uber.org/multierr" -) - -// Factory connects to stack using standard -// streams (STDIN, STDOUT pipes). -type Factory struct{} - -// NewPipeFactory returns new factory instance and starts -// listening -func NewPipeFactory() *Factory { - return &Factory{} -} - -type sr struct { - w *worker.Process - err error -} - -// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, -// method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit - spCh := make(chan sr) - const op = errors.Op("factory_spawn_worker_with_timeout") - go func() { - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - in, err := cmd.StdoutPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - out, err := cmd.StdinPipe() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - return - } - } - - pid, err := internal.FetchPID(relay) - if err != nil { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, err), - }: - return - default: - _ = w.Kill() - return - } - } - - if pid != w.Pid() { - select { - case spCh <- sr{ - w: nil, - err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())), - }: - return - default: - _ = w.Kill() - return - } - } - - select { - case - // return worker - spCh <- sr{ - w: w, - err: nil, - }: - // everything ok, set ready state - w.State().Set(worker.StateReady) - return - default: - _ = w.Kill() - return - } - }() - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case res := <-spCh: - if res.err != nil { - return nil, res.err - } - return res.w, nil - } -} - -func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("factory_spawn_worker") - w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...)) - if err != nil { - return nil, errors.E(op, err) - } - - in, err := cmd.StdoutPipe() - if err != nil { - return nil, errors.E(op, err) - } - - out, err := cmd.StdinPipe() - if err != nil { - return nil, errors.E(op, err) - } - - // Init new PIPE relay - relay := pipe.NewPipeRelay(in, out) - w.AttachRelay(relay) - - // Start the worker - err = w.Start() - if err != nil { - return nil, errors.E(op, err) - } - - // errors bundle - if pid, err := internal.FetchPID(relay); pid != w.Pid() { - err = multierr.Combine( - err, - w.Kill(), - w.Wait(), - ) - return nil, errors.E(op, err) - } - - // everything ok, set ready state - w.State().Set(worker.StateReady) - return w, nil -} - -// Close the factory. -func (f *Factory) Close() error { - return nil -} |