summaryrefslogtreecommitdiff
path: root/pkg/transport/pipe/pipe_factory.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
committerValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
commitf3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch)
tree32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /pkg/transport/pipe/pipe_factory.go
parent5d2cd55ab522d4f1e65a833f91146444465a32ac (diff)
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/transport/pipe/pipe_factory.go')
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go197
1 files changed, 0 insertions, 197 deletions
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
deleted file mode 100755
index 9433a510..00000000
--- a/pkg/transport/pipe/pipe_factory.go
+++ /dev/null
@@ -1,197 +0,0 @@
-package pipe
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/pipe"
- "github.com/spiral/roadrunner/v2/internal"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "go.uber.org/multierr"
-)
-
-// Factory connects to stack using standard
-// streams (STDIN, STDOUT pipes).
-type Factory struct{}
-
-// NewPipeFactory returns new factory instance and starts
-// listening
-func NewPipeFactory() *Factory {
- return &Factory{}
-}
-
-type sr struct {
- w *worker.Process
- err error
-}
-
-// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay,
-// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) { //nolint:gocognit
- spCh := make(chan sr)
- const op = errors.Op("factory_spawn_worker_with_timeout")
- go func() {
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- in, err := cmd.StdoutPipe()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- out, err := cmd.StdinPipe()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- // Init new PIPE relay
- relay := pipe.NewPipeRelay(in, out)
- w.AttachRelay(relay)
-
- // Start the worker
- err = w.Start()
- if err != nil {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- return
- }
- }
-
- pid, err := internal.FetchPID(relay)
- if err != nil {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, err),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
- if pid != w.Pid() {
- select {
- case spCh <- sr{
- w: nil,
- err: errors.E(op, errors.Errorf("pid mismatches, get: %d, want: %d", pid, w.Pid())),
- }:
- return
- default:
- _ = w.Kill()
- return
- }
- }
-
- select {
- case
- // return worker
- spCh <- sr{
- w: w,
- err: nil,
- }:
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
- return
- default:
- _ = w.Kill()
- return
- }
- }()
-
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case res := <-spCh:
- if res.err != nil {
- return nil, res.err
- }
- return res.w, nil
- }
-}
-
-func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("factory_spawn_worker")
- w, err := worker.InitBaseWorker(cmd, worker.AddListeners(listeners...))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- in, err := cmd.StdoutPipe()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- out, err := cmd.StdinPipe()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // Init new PIPE relay
- relay := pipe.NewPipeRelay(in, out)
- w.AttachRelay(relay)
-
- // Start the worker
- err = w.Start()
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // errors bundle
- if pid, err := internal.FetchPID(relay); pid != w.Pid() {
- err = multierr.Combine(
- err,
- w.Kill(),
- w.Wait(),
- )
- return nil, errors.E(op, err)
- }
-
- // everything ok, set ready state
- w.State().Set(worker.StateReady)
- return w, nil
-}
-
-// Close the factory.
-func (f *Factory) Close() error {
- return nil
-}