diff options
Diffstat (limited to 'pkg/worker')
-rwxr-xr-x | pkg/worker/sync_worker.go | 7 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 31 |
2 files changed, 30 insertions, 8 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 11992f22..13212cc6 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/interfaces/events" "github.com/spiral/roadrunner/v2/interfaces/worker" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" @@ -19,7 +18,7 @@ type syncWorker struct { w worker.BaseProcess } -// From creates SyncWorker from WorkerBasa +// From creates SyncWorker from BaseProcess func From(w worker.BaseProcess) (worker.SyncWorker, error) { return &syncWorker{ w: w, @@ -196,10 +195,6 @@ func (tw *syncWorker) Created() time.Time { return tw.w.Created() } -func (tw *syncWorker) AddListener(listener events.EventListener) { - tw.w.AddListener(listener) -} - func (tw *syncWorker) State() internal.State { return tw.w.State() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 456f4bea..db182a3e 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -29,6 +29,8 @@ const ( ReadBufSize = 10240 // Kb ) +type Options func(p *Process) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. @@ -76,7 +78,7 @@ type Process struct { } // InitBaseWorker creates new Process over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (worker.BaseProcess, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } @@ -103,6 +105,11 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { // at this point we know, that stderr will contain huge messages w.stderr.Grow(ReadBufSize) + // add options + for i := 0; i < len(options); i++ { + options[i](w) + } + go func() { w.watch() }() @@ -110,6 +117,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { return w, nil } +func AddListeners(listeners ...events.EventListener) Options { + return func(p *Process) { + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -121,7 +136,7 @@ func (w *Process) Created() time.Time { } // AddListener registers new worker event listener. -func (w *Process) AddListener(listener events.EventListener) { +func (w *Process) addListener(listener events.EventListener) { w.events.AddListener(listener) } @@ -176,6 +191,10 @@ func (w *Process) Wait() error { const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + if w.State().Value() == internal.StateDestroyed { + return errors.E(op, err) + } + // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first // and then w.cmd.Wait return an error @@ -234,6 +253,14 @@ func (w *Process) Stop() error { // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { + if w.State().Value() == internal.StateDestroyed { + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + return nil + } + w.state.Set(internal.StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { |