diff options
author | Valery Piashchynski <[email protected]> | 2021-02-02 19:17:03 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-02 19:17:03 +0300 |
commit | 2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd (patch) | |
tree | fe5efc14a18f78218816a2bc7a2e19ee95642714 /pkg/worker | |
parent | 68becf8c58daec426f94513cf444061c199194d7 (diff) |
Move worker states out of internal
Diffstat (limited to 'pkg/worker')
-rwxr-xr-x | pkg/worker/sync_worker.go | 17 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 23 |
2 files changed, 21 insertions, 19 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1a0393fb..696fbdb7 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -10,6 +10,7 @@ import ( "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() c <- wexec{ diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8fd71cca..2f1f399d 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -15,6 +15,7 @@ import ( "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(internal.StateInactive), + state: internal.NewWorkerState(states.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), // sync pool for STDERR @@ -190,7 +191,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") err := multierr.Combine(w.cmd.Wait()) - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { return errors.E(op, err) } @@ -199,7 +200,7 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) @@ -215,12 +216,12 @@ func (w *Process) Wait() error { err = multierr.Append(err, w.closeRelay()) if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) return err } if w.endState.Success() { - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) } w.stderr.Reset() @@ -241,20 +242,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(internal.StateStopping) + w.state.Set(states.StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) return multierr.Append(err, w.cmd.Process.Kill()) } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -262,12 +263,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } |