diff options
author | Valery Piashchynski <[email protected]> | 2022-01-12 00:19:48 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2022-01-12 00:19:48 +0300 |
commit | 215c7c91937bf65704db18a59a327a3c64e43530 (patch) | |
tree | 85d2b32ddf7230064e620bb59f76ef4bfd6b24cf /ipc/pipe/pipe_factory.go | |
parent | 7b5d220f0f1be155d83d887cd4996bdf4394c570 (diff) |
pass logger from the factory
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'ipc/pipe/pipe_factory.go')
-rwxr-xr-x | ipc/pipe/pipe_factory.go | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/ipc/pipe/pipe_factory.go b/ipc/pipe/pipe_factory.go new file mode 100755 index 00000000..4a3c9a67 --- /dev/null +++ b/ipc/pipe/pipe_factory.go @@ -0,0 +1,178 @@ +package pipe + +import ( + "context" + "os/exec" + + "github.com/spiral/goridge/v3/pkg/pipe" + "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/worker" + "go.uber.org/zap" +) + +// Factory connects to stack using standard +// streams (STDIN, STDOUT pipes). +type Factory struct { + log *zap.Logger +} + +// NewPipeFactory returns new factory instance and starts +// listening +func NewPipeFactory(log *zap.Logger) *Factory { + return &Factory{ + log: log, + } +} + +type sr struct { + w *worker.Process + err error +} + +// SpawnWorkerWithTimeout creates new Process and connects it to goridge relay, +// method Wait() must be handled on level above. +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (*worker.Process, error) { + spCh := make(chan sr) + go func() { + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + in, err := cmd.StdoutPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + out, err := cmd.StdinPipe() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + return + } + } + + // used as a ping + _, err = internal.Pid(relay) + if err != nil { + _ = w.Kill() + select { + case spCh <- sr{ + w: nil, + err: err, + }: + return + default: + _ = w.Kill() + return + } + } + + select { + case + // return worker + spCh <- sr{ + w: w, + err: nil, + }: + // everything ok, set ready state + w.State().Set(worker.StateReady) + return + default: + _ = w.Kill() + return + } + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-spCh: + if res.err != nil { + return nil, res.err + } + return res.w, nil + } +} + +func (f *Factory) SpawnWorker(cmd *exec.Cmd) (*worker.Process, error) { + w, err := worker.InitBaseWorker(cmd, worker.WithLog(f.log)) + if err != nil { + return nil, err + } + + in, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + + out, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + + // Init new PIPE relay + relay := pipe.NewPipeRelay(in, out) + w.AttachRelay(relay) + + // Start the worker + err = w.Start() + if err != nil { + return nil, err + } + + // errors bundle + _, err = internal.Pid(relay) + if err != nil { + _ = w.Kill() + return nil, err + } + + // everything ok, set ready state + w.State().Set(worker.StateReady) + return w, nil +} + +// Close the factory. +func (f *Factory) Close() error { + return nil +} |