summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.sum1
-rwxr-xr-xinternal/state.go94
-rwxr-xr-xinternal/state_test.go28
-rwxr-xr-xpkg/pool/static_pool.go25
-rwxr-xr-xpkg/pool/static_pool_test.go6
-rwxr-xr-xpkg/pool/supervisor_pool.go5
-rw-r--r--pkg/states/worker_states.go33
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go5
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go9
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go9
-rwxr-xr-xpkg/transport/socket/socket_factory.go5
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go2
-rw-r--r--pkg/worker/interface.go22
-rwxr-xr-xpkg/worker/state.go104
-rwxr-xr-xpkg/worker/state_test.go27
-rwxr-xr-xpkg/worker/sync_worker.go20
-rwxr-xr-xpkg/worker/worker.go27
-rw-r--r--pkg/worker_watcher/interface.go21
-rw-r--r--pkg/worker_watcher/stack.go3
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go38
20 files changed, 240 insertions, 244 deletions
diff --git a/go.sum b/go.sum
index 3b228b50..6c89410b 100644
--- a/go.sum
+++ b/go.sum
@@ -414,6 +414,7 @@ github.com/spiral/errors v1.0.9 h1:RcVZ7a1RYkaT3HWFGDuQiDB02pG6yqh7715Uwd7urwM=
github.com/spiral/errors v1.0.9/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o=
github.com/spiral/goridge/v3 v3.0.0 h1:FIz6wHaob5KynpOfzVpzj4bmqbEelGPFyuEf4i2+CG8=
github.com/spiral/goridge/v3 v3.0.0/go.mod h1:XFQGc42KNzo/hPIXPki7mEkFTf9v/T7qFk/TYJjMtzE=
+github.com/spiral/roadrunner v1.9.2 h1:jGtXs3r5fevdbrkDF8BdFxEY4rIZwplnns1oWj7Vyw8=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
diff --git a/internal/state.go b/internal/state.go
deleted file mode 100755
index d208aeed..00000000
--- a/internal/state.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package internal
-
-import (
- "fmt"
- "sync/atomic"
-
- "github.com/spiral/roadrunner/v2/pkg/states"
-)
-
-// State represents WorkerProcess status and updated time.
-type State interface {
- fmt.Stringer
- // Value returns WorkerState value
- Value() int64
- // Set sets the WorkerState
- Set(value int64)
- // NumJobs shows how many times WorkerProcess was invoked
- NumExecs() uint64
- // IsActive returns true if WorkerProcess not Inactive or Stopped
- IsActive() bool
- // RegisterExec using to registering php executions
- RegisterExec()
- // SetLastUsed sets worker last used time
- SetLastUsed(lu uint64)
- // LastUsed return worker last used time
- LastUsed() uint64
-}
-
-type WorkerState struct {
- value int64
- numExecs uint64
- // to be lightweight, use UnixNano
- lastUsed uint64
-}
-
-// Thread safe
-func NewWorkerState(value int64) *WorkerState {
- return &WorkerState{value: value}
-}
-
-// String returns current WorkerState as string.
-func (s *WorkerState) String() string {
- switch s.Value() {
- case states.StateInactive:
- return "inactive"
- case states.StateReady:
- return "ready"
- case states.StateWorking:
- return "working"
- case states.StateInvalid:
- return "invalid"
- case states.StateStopped:
- return "stopped"
- case states.StateErrored:
- return "errored"
- }
-
- return "undefined"
-}
-
-// NumExecs returns number of registered WorkerProcess execs.
-func (s *WorkerState) NumExecs() uint64 {
- return atomic.LoadUint64(&s.numExecs)
-}
-
-// Value WorkerState returns WorkerState value
-func (s *WorkerState) Value() int64 {
- return atomic.LoadInt64(&s.value)
-}
-
-// IsActive returns true if WorkerProcess not Inactive or Stopped
-func (s *WorkerState) IsActive() bool {
- val := s.Value()
- return val == states.StateWorking || val == states.StateReady
-}
-
-// change WorkerState value (status)
-func (s *WorkerState) Set(value int64) {
- atomic.StoreInt64(&s.value, value)
-}
-
-// register new execution atomically
-func (s *WorkerState) RegisterExec() {
- atomic.AddUint64(&s.numExecs, 1)
-}
-
-// Update last used time
-func (s *WorkerState) SetLastUsed(lu uint64) {
- atomic.StoreUint64(&s.lastUsed, lu)
-}
-
-func (s *WorkerState) LastUsed() uint64 {
- return atomic.LoadUint64(&s.lastUsed)
-}
diff --git a/internal/state_test.go b/internal/state_test.go
deleted file mode 100755
index a0581d57..00000000
--- a/internal/state_test.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package internal
-
-import (
- "testing"
-
- "github.com/spiral/roadrunner/v2/pkg/states"
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NewState(t *testing.T) {
- st := NewWorkerState(states.StateErrored)
-
- assert.Equal(t, "errored", st.String())
-
- assert.Equal(t, "inactive", NewWorkerState(states.StateInactive).String())
- assert.Equal(t, "ready", NewWorkerState(states.StateReady).String())
- assert.Equal(t, "working", NewWorkerState(states.StateWorking).String())
- assert.Equal(t, "stopped", NewWorkerState(states.StateStopped).String())
- assert.Equal(t, "undefined", NewWorkerState(1000).String())
-}
-
-func Test_IsActive(t *testing.T) {
- assert.False(t, NewWorkerState(states.StateInactive).IsActive())
- assert.True(t, NewWorkerState(states.StateReady).IsActive())
- assert.True(t, NewWorkerState(states.StateWorking).IsActive())
- assert.False(t, NewWorkerState(states.StateStopped).IsActive())
- assert.False(t, NewWorkerState(states.StateErrored).IsActive())
-}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 01b0574d..72c3d4df 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -84,7 +83,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg
}
// put stack in the pool
- err = p.ww.AddToWatch(workers)
+ err = p.ww.Watch(workers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -123,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} {
// Workers returns worker list associated with the pool.
func (sp *StaticPool) Workers() (workers []worker.SyncWorker) {
- return sp.ww.WorkersList()
+ return sp.ww.List()
}
func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error {
- return sp.ww.RemoveWorker(wb)
+ return sp.ww.Remove(wb)
}
// Be careful, sync Exec with ExecWithContext
@@ -195,7 +194,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -206,19 +205,19 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error {
const op = errors.Op("static_pool_check_max_jobs")
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err := sp.ww.AllocateNew()
+ err := sp.ww.Allocate()
if err != nil {
return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return nil
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
- // GetFreeWorker function consumes context with timeout
- w, err := sp.ww.GetFreeWorker(ctxGetFree)
+ // Get function consumes context with timeout
+ w, err := sp.ww.Get(ctxGetFree)
if err != nil {
// if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout
if errors.Is(errors.NoFreeWorkers, err) {
@@ -246,24 +245,24 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
// soft job errors are allowed
if errors.Is(errors.SoftJob, err) {
if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ err = sp.ww.Allocate()
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.Push(w)
}
return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(states.StateInvalid)
+ w.State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 4cfd5ec6..30a4ebaf 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
}
}
@@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for i := range p.Workers() {
- p.Workers()[i].State().Set(states.StateErrored)
+ p.Workers()[i].State().Set(worker.StateErrored)
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 3347ecd4..33438ae6 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -144,7 +143,7 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == states.StateInvalid {
+ if workers[i].State().Value() == worker.StateInvalid {
continue
}
@@ -177,7 +176,7 @@ func (sp *supervised) control() {
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != states.StateReady {
+ if workers[i].State().Value() != worker.StateReady {
continue
}
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
deleted file mode 100644
index fe653cb4..00000000
--- a/pkg/states/worker_states.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package states
-
-const (
- // StateInactive - no associated process
- StateInactive int64 = iota
-
- // StateReady - ready for job.
- StateReady
-
- // StateWorking - working on given payload.
- StateWorking
-
- // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
- StateInvalid
-
- // StateStopping - process is being softly stopped.
- StateStopping
-
- // StateKilling - process is being forcibly stopped
- StateKilling
-
- // State of worker, when no need to allocate new one
- StateDestroyed
-
- // StateStopped - process has been terminated.
- StateStopped
-
- // StateErrored - error WorkerState (can't be used).
- StateErrored
-
- // StateRemove - worker is killed and removed from the stack
- StateRemove
-)
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index 929043a7..b12ff36a 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -8,7 +8,6 @@ import (
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -93,7 +92,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
// everything ok, set ready state
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
// return worker
c <- SpawnResult{
@@ -153,7 +152,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// everything ok, set ready state
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
return w, nil
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 73008471..a00b2117 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -21,13 +20,13 @@ func Test_GetState2(t *testing.T) {
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, states.StateStopped, w.State().Value())
+ assert.Equal(t, worker.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
assert.NoError(t, w.Stop())
}
@@ -40,13 +39,13 @@ func Test_Kill2(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, states.StateErrored, w.State().Value())
+ assert.Equal(t, worker.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 3efeb59c..fb40ecb0 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -11,7 +11,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -23,13 +22,13 @@ func Test_GetState(t *testing.T) {
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, states.StateStopped, w.State().Value())
+ assert.Equal(t, worker.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -46,13 +45,13 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, states.StateErrored, w.State().Value())
+ assert.Equal(t, worker.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, states.StateReady, w.State().Value())
+ assert.Equal(t, worker.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index 98bd2389..4c9a386a 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -13,7 +13,6 @@ import (
"github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
@@ -125,7 +124,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
w.AttachRelay(rl)
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
c <- socketSpawn{
w: w,
@@ -168,7 +167,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
- w.State().Set(states.StateReady)
+ w.State().Set(worker.StateReady)
return w, nil
}
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index c13a897b..a8dd0fe0 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -401,7 +401,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- block := make(chan struct{})
+ block := make(chan struct{}, 10)
listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
if wev.Event == events.EventWorkerStderr {
diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go
index 9d74ae10..61d4a9e4 100644
--- a/pkg/worker/interface.go
+++ b/pkg/worker/interface.go
@@ -6,10 +6,28 @@ import (
"time"
"github.com/spiral/goridge/v3/interfaces/relay"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
)
+// State represents WorkerProcess status and updated time.
+type State interface {
+ fmt.Stringer
+ // Value returns StateImpl value
+ Value() int64
+ // Set sets the StateImpl
+ Set(value int64)
+ // NumJobs shows how many times WorkerProcess was invoked
+ NumExecs() uint64
+ // IsActive returns true if WorkerProcess not Inactive or Stopped
+ IsActive() bool
+ // RegisterExec using to registering php executions
+ RegisterExec()
+ // SetLastUsed sets worker last used time
+ SetLastUsed(lu uint64)
+ // LastUsed return worker last used time
+ LastUsed() uint64
+}
+
type BaseProcess interface {
fmt.Stringer
@@ -21,7 +39,7 @@ type BaseProcess interface {
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
- State() internal.State
+ State() State
// Start used to run Cmd and immediately return
Start() error
diff --git a/pkg/worker/state.go b/pkg/worker/state.go
new file mode 100755
index 00000000..54f76c09
--- /dev/null
+++ b/pkg/worker/state.go
@@ -0,0 +1,104 @@
+package worker
+
+import (
+ "sync/atomic"
+)
+
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ // StateKilling - process is being forcibly stopped
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error StateImpl (can't be used).
+ StateErrored
+
+ // StateRemove - worker is killed and removed from the stack
+ StateRemove
+)
+
+type StateImpl struct {
+ value int64
+ numExecs uint64
+ // to be lightweight, use UnixNano
+ lastUsed uint64
+}
+
+// Thread safe
+func NewWorkerState(value int64) *StateImpl {
+ return &StateImpl{value: value}
+}
+
+// String returns current StateImpl as string.
+func (s *StateImpl) String() string {
+ switch s.Value() {
+ case StateInactive:
+ return "inactive"
+ case StateReady:
+ return "ready"
+ case StateWorking:
+ return "working"
+ case StateInvalid:
+ return "invalid"
+ case StateStopped:
+ return "stopped"
+ case StateErrored:
+ return "errored"
+ }
+
+ return "undefined"
+}
+
+// NumExecs returns number of registered WorkerProcess execs.
+func (s *StateImpl) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
+}
+
+// Value StateImpl returns StateImpl value
+func (s *StateImpl) Value() int64 {
+ return atomic.LoadInt64(&s.value)
+}
+
+// IsActive returns true if WorkerProcess not Inactive or Stopped
+func (s *StateImpl) IsActive() bool {
+ val := s.Value()
+ return val == StateWorking || val == StateReady
+}
+
+// change StateImpl value (status)
+func (s *StateImpl) Set(value int64) {
+ atomic.StoreInt64(&s.value, value)
+}
+
+// register new execution atomically
+func (s *StateImpl) RegisterExec() {
+ atomic.AddUint64(&s.numExecs, 1)
+}
+
+// Update last used time
+func (s *StateImpl) SetLastUsed(lu uint64) {
+ atomic.StoreUint64(&s.lastUsed, lu)
+}
+
+func (s *StateImpl) LastUsed() uint64 {
+ return atomic.LoadUint64(&s.lastUsed)
+}
diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go
new file mode 100755
index 00000000..c67182d6
--- /dev/null
+++ b/pkg/worker/state_test.go
@@ -0,0 +1,27 @@
+package worker
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_NewState(t *testing.T) {
+ st := NewWorkerState(StateErrored)
+
+ assert.Equal(t, "errored", st.String())
+
+ assert.Equal(t, "inactive", NewWorkerState(StateInactive).String())
+ assert.Equal(t, "ready", NewWorkerState(StateReady).String())
+ assert.Equal(t, "working", NewWorkerState(StateWorking).String())
+ assert.Equal(t, "stopped", NewWorkerState(StateStopped).String())
+ assert.Equal(t, "undefined", NewWorkerState(1000).String())
+}
+
+func Test_IsActive(t *testing.T) {
+ assert.False(t, NewWorkerState(StateInactive).IsActive())
+ assert.True(t, NewWorkerState(StateReady).IsActive())
+ assert.True(t, NewWorkerState(StateWorking).IsActive())
+ assert.False(t, NewWorkerState(StateStopped).IsActive())
+ assert.False(t, NewWorkerState(StateErrored).IsActive())
+}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 010af076..8ed57ac2 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -8,9 +8,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -48,25 +46,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -91,7 +89,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != states.StateReady {
+ if tw.process.State().Value() != StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -101,13 +99,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(states.StateWorking)
+ tw.process.State().Set(StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(states.StateErrored)
+ tw.process.State().Set(StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -117,7 +115,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(states.StateReady)
+ tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
c <- wexec{
@@ -214,7 +212,7 @@ func (tw *SyncWorkerImpl) Created() time.Time {
return tw.process.Created()
}
-func (tw *SyncWorkerImpl) State() internal.State {
+func (tw *SyncWorkerImpl) State() State {
return tw.process.State()
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index b726c6f1..f7e8008f 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -39,7 +38,7 @@ type Process struct {
// number of Process executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
- state *internal.WorkerState
+ state *StateImpl
// underlying command with associated process, command must be
// provided to Process from outside in non-started form. CmdSource
@@ -67,7 +66,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(states.StateInactive),
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -106,7 +105,7 @@ func (w *Process) addListener(listener events.Listener) {
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
-func (w *Process) State() internal.State {
+func (w *Process) State() State {
return w.state
}
@@ -157,13 +156,13 @@ func (w *Process) Wait() error {
err = w.cmd.Wait()
// If worker was destroyed, just exit
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
return nil
}
// If state is different, and err is not nil, append it to the errors
if err != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
err = multierr.Combine(err, errors.E(op, err))
}
@@ -173,12 +172,12 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
err2 := w.closeRelay()
if err2 != nil {
- w.State().Set(states.StateErrored)
+ w.State().Set(StateErrored)
return multierr.Append(err, errors.E(op, err2))
}
if w.cmd.ProcessState.Success() {
- w.State().Set(states.StateStopped)
+ w.State().Set(StateStopped)
return nil
}
@@ -198,20 +197,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(states.StateStopping)
+ w.state.Set(StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(states.StateKilling)
+ w.state.Set(StateKilling)
return multierr.Append(err, w.cmd.Process.Kill())
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -219,12 +218,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(states.StateKilling)
+ w.state.Set(StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(states.StateStopped)
+ w.state.Set(StateStopped)
return nil
}
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
index 927aa270..ce5011c0 100644
--- a/pkg/worker_watcher/interface.go
+++ b/pkg/worker_watcher/interface.go
@@ -6,25 +6,26 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker"
)
+// Watcher is an interface for the Sync workers lifecycle
type Watcher interface {
- // AddToWatch used to add stack to wait its state
- AddToWatch(workers []worker.SyncWorker) error
+ // Watch used to add workers to the stack
+ Watch(workers []worker.SyncWorker) error
- // GetFreeWorker provide first free worker
- GetFreeWorker(ctx context.Context) (worker.SyncWorker, error)
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.SyncWorker, error)
- // PutWorker enqueues worker back
- PushWorker(w worker.SyncWorker)
+ // Push enqueues worker back
+ Push(w worker.SyncWorker)
- // AllocateNew used to allocate new worker and put in into the WorkerWatcher
- AllocateNew() error
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
// WorkersList return all stack w/o removing it from internal storage
- WorkersList() []worker.SyncWorker
+ List() []worker.SyncWorker
// RemoveWorker remove worker from the stack
- RemoveWorker(wb worker.SyncWorker) error
+ Remove(wb worker.SyncWorker) error
}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index 55034e41..51c3d016 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,7 +5,6 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -129,7 +128,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(states.StateDestroyed)
+ stack.workers[i].State().Set(worker.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index cf2e1eb7..6b9e9dbf 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -6,7 +6,6 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -28,7 +27,7 @@ type workerWatcher struct {
events events.Handler
}
-func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
+func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error {
for i := 0; i < len(workers); i++ {
ww.stack.Push(workers[i])
@@ -39,7 +38,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error {
return nil
}
-func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) {
+func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) {
const op = errors.Op("worker_watcher_get_free_worker")
// thread safe operation
w, stop := ww.stack.Pop()
@@ -49,14 +48,25 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == states.StateRemove {
- err := ww.RemoveWorker(w)
+ if w != nil && w.State().Value() == worker.StateRemove {
+ err := ww.Remove(w)
if err != nil {
return nil, err
}
// try to get next
- return ww.GetFreeWorker(ctx)
+ return ww.Get(ctx)
}
+
+ // if worker not in the ready state it possibly corrupted
+ if w != nil && w.State().Value() != worker.StateReady {
+ err := ww.Remove(w)
+ if err != nil {
+ return nil, err
+ }
+ // try to get next
+ return ww.Get(ctx)
+ }
+
// no free stack
if w == nil {
for {
@@ -79,7 +89,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
return w, nil
}
-func (ww *workerWatcher) AllocateNew() error {
+func (ww *workerWatcher) Allocate() error {
ww.stack.mutex.Lock()
const op = errors.Op("worker_watcher_allocate_new")
sw, err := ww.allocator()
@@ -89,12 +99,12 @@ func (ww *workerWatcher) AllocateNew() error {
ww.addToWatch(sw)
ww.stack.mutex.Unlock()
- ww.PushWorker(sw)
+ ww.Push(sw)
return nil
}
-func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
+func (ww *workerWatcher) Remove(wb worker.SyncWorker) error {
ww.mutex.Lock()
defer ww.mutex.Unlock()
@@ -102,7 +112,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(states.StateRemove)
+ wb.State().Set(worker.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -114,7 +124,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
}
// O(1) operation
-func (ww *workerWatcher) PushWorker(w worker.SyncWorker) {
+func (ww *workerWatcher) Push(w worker.SyncWorker) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
ww.stack.Push(w)
@@ -127,7 +137,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation, and it will return copy of the actual workers
-func (ww *workerWatcher) WorkersList() []worker.SyncWorker {
+func (ww *workerWatcher) List() []worker.SyncWorker {
return ww.stack.Workers()
}
@@ -142,14 +152,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == states.StateDestroyed {
+ if w.State().Value() == worker.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return
}
_ = ww.stack.FindAndRemoveByPid(w.Pid())
- err = ww.AllocateNew()
+ err = ww.Allocate()
if err != nil {
ww.events.Push(events.PoolEvent{
Event: events.EventPoolError,