diff options
Diffstat (limited to 'pkg/worker')
-rw-r--r-- | pkg/worker/interface.go | 22 | ||||
-rwxr-xr-x | pkg/worker/state.go | 104 | ||||
-rwxr-xr-x | pkg/worker/state_test.go | 27 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 20 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 27 |
5 files changed, 173 insertions, 27 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 9d74ae10..61d4a9e4 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -6,10 +6,28 @@ import ( "time" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" ) +// State represents WorkerProcess status and updated time. +type State interface { + fmt.Stringer + // Value returns StateImpl value + Value() int64 + // Set sets the StateImpl + Set(value int64) + // NumJobs shows how many times WorkerProcess was invoked + NumExecs() uint64 + // IsActive returns true if WorkerProcess not Inactive or Stopped + IsActive() bool + // RegisterExec using to registering php executions + RegisterExec() + // SetLastUsed sets worker last used time + SetLastUsed(lu uint64) + // LastUsed return worker last used time + LastUsed() uint64 +} + type BaseProcess interface { fmt.Stringer @@ -21,7 +39,7 @@ type BaseProcess interface { // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() internal.State + State() State // Start used to run Cmd and immediately return Start() error diff --git a/pkg/worker/state.go b/pkg/worker/state.go new file mode 100755 index 00000000..54f76c09 --- /dev/null +++ b/pkg/worker/state.go @@ -0,0 +1,104 @@ +package worker + +import ( + "sync/atomic" +) + +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + // StateKilling - process is being forcibly stopped + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error StateImpl (can't be used). + StateErrored + + // StateRemove - worker is killed and removed from the stack + StateRemove +) + +type StateImpl struct { + value int64 + numExecs uint64 + // to be lightweight, use UnixNano + lastUsed uint64 +} + +// Thread safe +func NewWorkerState(value int64) *StateImpl { + return &StateImpl{value: value} +} + +// String returns current StateImpl as string. +func (s *StateImpl) String() string { + switch s.Value() { + case StateInactive: + return "inactive" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateInvalid: + return "invalid" + case StateStopped: + return "stopped" + case StateErrored: + return "errored" + } + + return "undefined" +} + +// NumExecs returns number of registered WorkerProcess execs. +func (s *StateImpl) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) +} + +// Value StateImpl returns StateImpl value +func (s *StateImpl) Value() int64 { + return atomic.LoadInt64(&s.value) +} + +// IsActive returns true if WorkerProcess not Inactive or Stopped +func (s *StateImpl) IsActive() bool { + val := s.Value() + return val == StateWorking || val == StateReady +} + +// change StateImpl value (status) +func (s *StateImpl) Set(value int64) { + atomic.StoreInt64(&s.value, value) +} + +// register new execution atomically +func (s *StateImpl) RegisterExec() { + atomic.AddUint64(&s.numExecs, 1) +} + +// Update last used time +func (s *StateImpl) SetLastUsed(lu uint64) { + atomic.StoreUint64(&s.lastUsed, lu) +} + +func (s *StateImpl) LastUsed() uint64 { + return atomic.LoadUint64(&s.lastUsed) +} diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go new file mode 100755 index 00000000..c67182d6 --- /dev/null +++ b/pkg/worker/state_test.go @@ -0,0 +1,27 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 010af076..8ed57ac2 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,9 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -48,25 +46,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -91,7 +89,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -101,13 +99,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -117,7 +115,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() c <- wexec{ @@ -214,7 +212,7 @@ func (tw *SyncWorkerImpl) Created() time.Time { return tw.process.Created() } -func (tw *SyncWorkerImpl) State() internal.State { +func (tw *SyncWorkerImpl) State() State { return tw.process.State() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index b726c6f1..f7e8008f 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -39,7 +38,7 @@ type Process struct { // number of Process executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. - state *internal.WorkerState + state *StateImpl // underlying command with associated process, command must be // provided to Process from outside in non-started form. CmdSource @@ -67,7 +66,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(states.StateInactive), + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -106,7 +105,7 @@ func (w *Process) addListener(listener events.Listener) { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) State() internal.State { +func (w *Process) State() State { return w.state } @@ -157,13 +156,13 @@ func (w *Process) Wait() error { err = w.cmd.Wait() // If worker was destroyed, just exit - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { return nil } // If state is different, and err is not nil, append it to the errors if err != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) err = multierr.Combine(err, errors.E(op, err)) } @@ -173,12 +172,12 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error err2 := w.closeRelay() if err2 != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) return multierr.Append(err, errors.E(op, err2)) } if w.cmd.ProcessState.Success() { - w.State().Set(states.StateStopped) + w.State().Set(StateStopped) return nil } @@ -198,20 +197,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(states.StateStopping) + w.state.Set(StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(states.StateKilling) + w.state.Set(StateKilling) return multierr.Append(err, w.cmd.Process.Kill()) } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -219,12 +218,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(states.StateKilling) + w.state.Set(StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } |