diff options
-rw-r--r-- | pipe_factory.go | 2 | ||||
-rw-r--r-- | pool_test.go | 2 | ||||
-rw-r--r-- | socket_factory.go | 2 | ||||
-rw-r--r-- | worker.go | 59 |
4 files changed, 31 insertions, 34 deletions
diff --git a/pipe_factory.go b/pipe_factory.go index 36f7a7e3..93e4b5d1 100644 --- a/pipe_factory.go +++ b/pipe_factory.go @@ -40,7 +40,7 @@ func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { w.rl = goridge.NewPipeRelay(in, out) - if err := w.Start(); err != nil { + if err := w.start(); err != nil { return nil, errors.Wrap(err, "process error") } diff --git a/pool_test.go b/pool_test.go index 121191ee..9d9bcca8 100644 --- a/pool_test.go +++ b/pool_test.go @@ -23,6 +23,8 @@ func Test_NewPool(t *testing.T) { NewPipeFactory(), cfg, ) + assert.Equal(t, cfg, p.Config()) + defer p.Destroy() assert.NotNil(t, p) diff --git a/socket_factory.go b/socket_factory.go index a77758e9..acdc91b1 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -45,7 +45,7 @@ func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) { return nil, err } - if err := w.Start(); err != nil { + if err := w.start(); err != nil { return nil, errors.Wrap(err, "process error") } @@ -87,38 +87,6 @@ func (w *Worker) String() string { ) } -// Start underlying process or return error -func (w *Worker) Start() error { - if w.cmd.Process != nil { - return fmt.Errorf("process already running") - } - - if err := w.cmd.Start(); err != nil { - close(w.waitDone) - - return err - } - - w.Pid = &w.cmd.Process.Pid - - // wait for process to complete - go func() { - w.endState, _ = w.cmd.Process.Wait() - if w.waitDone != nil { - w.state.set(StateStopped) - close(w.waitDone) - - if w.rl != nil { - w.mu.Lock() - defer w.mu.Unlock() - w.rl.Close() - } - } - }() - - return nil -} - // Wait must be called once for each worker, call will be released once worker is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -214,6 +182,33 @@ func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { return rsp, err } +func (w *Worker) start() error { + if err := w.cmd.Start(); err != nil { + close(w.waitDone) + + return err + } + + w.Pid = &w.cmd.Process.Pid + + // wait for process to complete + go func() { + w.endState, _ = w.cmd.Process.Wait() + if w.waitDone != nil { + w.state.set(StateStopped) + close(w.waitDone) + + if w.rl != nil { + w.mu.Lock() + defer w.mu.Unlock() + w.rl.Close() + } + } + }() + + return nil +} + func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { if err := sendHead(w.rl, rqs.Head); err != nil { return nil, errors.Wrap(err, "header error") |