summaryrefslogtreecommitdiff
path: root/pkg/worker
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/worker')
-rw-r--r--pkg/worker/interface.go24
-rwxr-xr-xpkg/worker/state.go108
-rwxr-xr-xpkg/worker/state_test.go27
-rwxr-xr-xpkg/worker/sync_worker.go39
-rwxr-xr-xpkg/worker/worker.go43
5 files changed, 183 insertions, 58 deletions
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 9d74ae10..96eb25bc 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -5,11 +5,29 @@ import (
"fmt"
"time"
- "github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/pkg/payload"
)
+// State represents WorkerProcess status and updated time.
+type State interface {
+ fmt.Stringer
+ // Value returns StateImpl value
+ Value() int64
+ // Set sets the StateImpl
+ Set(value int64)
+ // NumJobs shows how many times WorkerProcess was invoked
+ NumExecs() uint64
+ // IsActive returns true if WorkerProcess not Inactive or Stopped
+ IsActive() bool
+ // RegisterExec using to registering php executions
+ RegisterExec()
+ // SetLastUsed sets worker last used time
+ SetLastUsed(lu uint64)
+ // LastUsed return worker last used time
+ LastUsed() uint64
+}
+
type BaseProcess interface {
fmt.Stringer
@@ -21,7 +39,7 @@ type BaseProcess interface {
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() internal.State
+ State() State
// Start used to run Cmd and immediately return
Start() error
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
new file mode 100755
index 00000000..176e151b
--- /dev/null
+++ b/pkg/worker/state.go
@@ -0,0 +1,108 @@
+package worker
+
+import (
+ "sync/atomic"
+)
+
+// SYNC WITH worker_watcher.GET
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateKilling - process is being forcibly stopped
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error StateImpl (can't be used).
+ StateErrored
+)
+
+type StateImpl struct {
+ value int64
+ numExecs uint64
+ // to be lightweight, use UnixNano
+ lastUsed uint64
+}
+
+// Thread safe
+func NewWorkerState(value int64) *StateImpl {
+ return &StateImpl{value: value}
+}
+
+// String returns current StateImpl as string.
+func (s *StateImpl) String() string {
+ switch s.Value() {
+ case StateInactive:
+ return "inactive"
+ case StateReady:
+ return "ready"
+ case StateWorking:
+ return "working"
+ case StateInvalid:
+ return "invalid"
+ case StateStopping:
+ return "stopping"
+ case StateStopped:
+ return "stopped"
+ case StateKilling:
+ return "killing"
+ case StateErrored:
+ return "errored"
+ case StateDestroyed:
+ return "destroyed"
+ }
+
+ return "undefined"
+}
+
+// NumExecs returns number of registered WorkerProcess execs.
+func (s *StateImpl) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
+}
+
+// Value StateImpl returns StateImpl value
+func (s *StateImpl) Value() int64 {
+ return atomic.LoadInt64(&s.value)
+}
+
+// IsActive returns true if WorkerProcess not Inactive or Stopped
+func (s *StateImpl) IsActive() bool {
+ val := s.Value()
+ return val == StateWorking || val == StateReady
+}
+
+// change StateImpl value (status)
+func (s *StateImpl) Set(value int64) {
+ atomic.StoreInt64(&s.value, value)
+}
+
+// register new execution atomically
+func (s *StateImpl) RegisterExec() {
+ atomic.AddUint64(&s.numExecs, 1)
+}
+
+// Update last used time
+func (s *StateImpl) SetLastUsed(lu uint64) {
+ atomic.StoreUint64(&s.lastUsed, lu)
+}
+
+func (s *StateImpl) LastUsed() uint64 {
+ return atomic.LoadUint64(&s.lastUsed)
+}
diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go
new file mode 100755
index 00000000..c67182d6
--- /dev/null
+++ b/pkg/worker/state_test.go
@@ -0,0 +1,27 @@
+package worker
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 010af076..82a5462a 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -6,41 +6,26 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
// Allocator is responsible for worker allocation in the pool
-type Allocator func() (*SyncWorkerImpl, error)
+type Allocator func() (SyncWorker, error)
type SyncWorkerImpl struct {
process *Process
}
// From creates SyncWorker from BaseProcess
-func From(process *Process) *SyncWorkerImpl {
+func From(process *Process) SyncWorker {
return &SyncWorkerImpl{
process: process,
}
}
-// FromSync creates BaseProcess from SyncWorkerImpl
-func FromSync(w *SyncWorkerImpl) BaseProcess {
- return &Process{
- created: w.process.created,
- events: w.process.events,
- state: w.process.state,
- cmd: w.process.cmd,
- pid: w.process.pid,
- endState: w.process.endState,
- relay: w.process.relay,
- }
-}
-
// Exec payload without TTL timeout.
func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
const op = errors.Op("sync_worker_exec")
@@ -48,25 +33,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -91,7 +76,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -101,13 +86,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -117,7 +102,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
c <- wexec{
@@ -214,7 +199,7 @@ func (tw *SyncWorkerImpl) Created() time.Time {
return tw.process.Created()
}
-func (tw *SyncWorkerImpl) State() internal.State {
+func (tw *SyncWorkerImpl) State() State {
return tw.process.State()
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b726c6f1..0f7ab755 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -9,22 +9,12 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/goridge/v3/interfaces/relay"
+ "github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 25 * time.Millisecond
-
- // ReadBufSize used to make a slice with specified length to read from stderr
- ReadBufSize = 10240 // Kb
-)
-
type Options func(p *Process)
// Process - supervised process with api over goridge.Relay.
@@ -39,7 +29,7 @@ type Process struct {
// number of Process executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
- state *internal.WorkerState
+ state *StateImpl
// underlying command with associated process, command must be
// provided to Process from outside in non-started form. CmdSource
@@ -50,9 +40,6 @@ type Process struct {
// can be nil while process is not started.
pid int
- // contains information about resulted process state.
- endState *os.ProcessState
-
// communication bus with underlying process.
relay relay.Relay
}
@@ -67,7 +54,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(states.StateInactive),
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -106,7 +93,7 @@ func (w *Process) addListener(listener events.Listener) {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) State() internal.State {
+func (w *Process) State() State {
return w.state
}
@@ -157,13 +144,13 @@ func (w *Process) Wait() error {
err = w.cmd.Wait()
// If worker was destroyed, just exit
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
return nil
}
// If state is different, and err is not nil, append it to the errors
if err != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
err = multierr.Combine(err, errors.E(op, err))
}
@@ -173,12 +160,12 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
err2 := w.closeRelay()
if err2 != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
return multierr.Append(err, errors.E(op, err2))
}
if w.cmd.ProcessState.Success() {
- w.State().Set(states.StateStopped)
+ w.State().Set(StateStopped)
return nil
}
@@ -198,20 +185,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(states.StateStopping)
+ w.state.Set(StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(states.StateKilling)
- return multierr.Append(err, w.cmd.Process.Kill())
+ w.state.Set(StateKilling)
+ return multierr.Append(err, w.cmd.Process.Signal(os.Kill))
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -219,12 +206,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(states.StateKilling)
+ w.state.Set(StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}