summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-02 19:17:03 +0300
committerValery Piashchynski <[email protected]>2021-02-02 19:17:03 +0300
commit2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd (patch)
treefe5efc14a18f78218816a2bc7a2e19ee95642714 /pkg/worker
parent68becf8c58daec426f94513cf444061c199194d7 (diff)
Move worker states out of internal
Diffstat (limited to 'pkg/worker')
-rwxr-xr-xpkg/worker/sync_worker.go17
-rwxr-xr-xpkg/worker/worker.go23
2 files changed, 21 insertions, 19 deletions
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1a0393fb..696fbdb7 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
c <- wexec{
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 8fd71cca..2f1f399d 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -15,6 +15,7 @@ import (
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(internal.StateInactive),
+ state: internal.NewWorkerState(states.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
// sync pool for STDERR
@@ -190,7 +191,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
err := multierr.Combine(w.cmd.Wait())
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
return errors.E(op, err)
}
@@ -199,7 +200,7 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
@@ -215,12 +216,12 @@ func (w *Process) Wait() error {
err = multierr.Append(err, w.closeRelay())
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
return err
}
if w.endState.Success() {
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
}
w.stderr.Reset()
@@ -241,20 +242,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(internal.StateStopping)
+ w.state.Set(states.StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
return multierr.Append(err, w.cmd.Process.Kill())
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -262,12 +263,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}