diff options
author | Valery Piashchynski <[email protected]> | 2021-02-04 20:37:48 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-04 20:37:48 +0300 |
commit | d629f08408a4478aaba90079a4e37ab69cfc12ef (patch) | |
tree | 2cb67bc5c9be295428239369e9d211f3888308fe /pkg | |
parent | efacb852e279e6bbfc076c0faff391ff39815718 (diff) |
pre-rc stabilization of the interfaces and internal code
Diffstat (limited to 'pkg')
-rwxr-xr-x | pkg/pool/static_pool.go | 25 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 6 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 5 | ||||
-rw-r--r-- | pkg/states/worker_states.go | 33 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory.go | 5 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 9 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 9 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 5 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 2 | ||||
-rw-r--r-- | pkg/worker/interface.go | 22 | ||||
-rwxr-xr-x | pkg/worker/state.go | 104 | ||||
-rwxr-xr-x | pkg/worker/state_test.go | 27 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 20 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 27 | ||||
-rw-r--r-- | pkg/worker_watcher/interface.go | 21 | ||||
-rw-r--r-- | pkg/worker_watcher/stack.go | 3 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 38 |
17 files changed, 239 insertions, 122 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 01b0574d..72c3d4df 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" @@ -84,7 +83,7 @@ func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg } // put stack in the pool - err = p.ww.AddToWatch(workers) + err = p.ww.Watch(workers) if err != nil { return nil, errors.E(op, err) } @@ -123,11 +122,11 @@ func (sp *StaticPool) GetConfig() interface{} { // Workers returns worker list associated with the pool. func (sp *StaticPool) Workers() (workers []worker.SyncWorker) { - return sp.ww.WorkersList() + return sp.ww.List() } func (sp *StaticPool) RemoveWorker(wb worker.SyncWorker) error { - return sp.ww.RemoveWorker(wb) + return sp.ww.Remove(wb) } // Be careful, sync Exec with ExecWithContext @@ -195,7 +194,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p func (sp *StaticPool) stopWorker(w worker.SyncWorker) { const op = errors.Op("static_pool_stop_worker") - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) err := w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -206,19 +205,19 @@ func (sp *StaticPool) stopWorker(w worker.SyncWorker) { func (sp *StaticPool) checkMaxJobs(w worker.SyncWorker) error { const op = errors.Op("static_pool_check_max_jobs") if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew() + err := sp.ww.Allocate() if err != nil { return errors.E(op, err) } } else { - sp.ww.PushWorker(w) + sp.ww.Push(w) } return nil } func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { - // GetFreeWorker function consumes context with timeout - w, err := sp.ww.GetFreeWorker(ctxGetFree) + // Get function consumes context with timeout + w, err := sp.ww.Get(ctxGetFree) if err != nil { // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout if errors.Is(errors.NoFreeWorkers, err) { @@ -246,24 +245,24 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { // soft job errors are allowed if errors.Is(errors.SoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew() + err = sp.ww.Allocate() if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } } else { - sp.ww.PushWorker(w) + sp.ww.Push(w) } return payload.Payload{}, errors.E(op, err) } - w.State().Set(states.StateInvalid) + w.State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 4cfd5ec6..30a4ebaf 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -14,8 +14,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { list := p.Workers() for _, w := range list { - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) } } @@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) for i := range p.Workers() { - p.Workers()[i].State().Set(states.StateErrored) + p.Workers()[i].State().Set(worker.StateErrored) } _, err = p.Exec(payload.Payload{Body: []byte("hello")}) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 3347ecd4..33438ae6 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -144,7 +143,7 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == states.StateInvalid { + if workers[i].State().Value() == worker.StateInvalid { continue } @@ -177,7 +176,7 @@ func (sp *supervised) control() { // firs we check maxWorker idle if sp.cfg.IdleTTL != 0 { // then check for the worker state - if workers[i].State().Value() != states.StateReady { + if workers[i].State().Value() != worker.StateReady { continue } diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go deleted file mode 100644 index fe653cb4..00000000 --- a/pkg/states/worker_states.go +++ /dev/null @@ -1,33 +0,0 @@ -package states - -const ( - // StateInactive - no associated process - StateInactive int64 = iota - - // StateReady - ready for job. - StateReady - - // StateWorking - working on given payload. - StateWorking - - // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. - StateInvalid - - // StateStopping - process is being softly stopped. - StateStopping - - // StateKilling - process is being forcibly stopped - StateKilling - - // State of worker, when no need to allocate new one - StateDestroyed - - // StateStopped - process has been terminated. - StateStopped - - // StateErrored - error WorkerState (can't be used). - StateErrored - - // StateRemove - worker is killed and removed from the stack - StateRemove -) diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index 929043a7..b12ff36a 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -8,7 +8,6 @@ import ( "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -93,7 +92,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } // everything ok, set ready state - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) // return worker c <- SpawnResult{ @@ -153,7 +152,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // everything ok, set ready state - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) return w, nil } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 73008471..a00b2117 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -10,7 +10,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -21,13 +20,13 @@ func Test_GetState2(t *testing.T) { w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, states.StateStopped, w.State().Value()) + assert.Equal(t, worker.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) assert.NoError(t, w.Stop()) } @@ -40,13 +39,13 @@ func Test_Kill2(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, states.StateErrored, w.State().Value()) + assert.Equal(t, worker.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 3efeb59c..fb40ecb0 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -11,7 +11,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -23,13 +22,13 @@ func Test_GetState(t *testing.T) { w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, states.StateStopped, w.State().Value()) + assert.Equal(t, worker.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -46,13 +45,13 @@ func Test_Kill(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, states.StateErrored, w.State().Value()) + assert.Equal(t, worker.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, states.StateReady, w.State().Value()) + assert.Equal(t, worker.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 98bd2389..4c9a386a 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -13,7 +13,6 @@ import ( "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" @@ -125,7 +124,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } w.AttachRelay(rl) - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) c <- socketSpawn{ w: w, @@ -168,7 +167,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) - w.State().Set(states.StateReady) + w.State().Set(worker.StateReady) return w, nil } diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index c13a897b..a8dd0fe0 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -401,7 +401,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - block := make(chan struct{}) + block := make(chan struct{}, 10) listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { if wev.Event == events.EventWorkerStderr { diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 9d74ae10..61d4a9e4 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -6,10 +6,28 @@ import ( "time" "github.com/spiral/goridge/v3/interfaces/relay" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" ) +// State represents WorkerProcess status and updated time. +type State interface { + fmt.Stringer + // Value returns StateImpl value + Value() int64 + // Set sets the StateImpl + Set(value int64) + // NumJobs shows how many times WorkerProcess was invoked + NumExecs() uint64 + // IsActive returns true if WorkerProcess not Inactive or Stopped + IsActive() bool + // RegisterExec using to registering php executions + RegisterExec() + // SetLastUsed sets worker last used time + SetLastUsed(lu uint64) + // LastUsed return worker last used time + LastUsed() uint64 +} + type BaseProcess interface { fmt.Stringer @@ -21,7 +39,7 @@ type BaseProcess interface { // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() internal.State + State() State // Start used to run Cmd and immediately return Start() error diff --git a/pkg/worker/state.go b/pkg/worker/state.go new file mode 100755 index 00000000..54f76c09 --- /dev/null +++ b/pkg/worker/state.go @@ -0,0 +1,104 @@ +package worker + +import ( + "sync/atomic" +) + +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + // StateKilling - process is being forcibly stopped + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error StateImpl (can't be used). + StateErrored + + // StateRemove - worker is killed and removed from the stack + StateRemove +) + +type StateImpl struct { + value int64 + numExecs uint64 + // to be lightweight, use UnixNano + lastUsed uint64 +} + +// Thread safe +func NewWorkerState(value int64) *StateImpl { + return &StateImpl{value: value} +} + +// String returns current StateImpl as string. +func (s *StateImpl) String() string { + switch s.Value() { + case StateInactive: + return "inactive" + case StateReady: + return "ready" + case StateWorking: + return "working" + case StateInvalid: + return "invalid" + case StateStopped: + return "stopped" + case StateErrored: + return "errored" + } + + return "undefined" +} + +// NumExecs returns number of registered WorkerProcess execs. +func (s *StateImpl) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) +} + +// Value StateImpl returns StateImpl value +func (s *StateImpl) Value() int64 { + return atomic.LoadInt64(&s.value) +} + +// IsActive returns true if WorkerProcess not Inactive or Stopped +func (s *StateImpl) IsActive() bool { + val := s.Value() + return val == StateWorking || val == StateReady +} + +// change StateImpl value (status) +func (s *StateImpl) Set(value int64) { + atomic.StoreInt64(&s.value, value) +} + +// register new execution atomically +func (s *StateImpl) RegisterExec() { + atomic.AddUint64(&s.numExecs, 1) +} + +// Update last used time +func (s *StateImpl) SetLastUsed(lu uint64) { + atomic.StoreUint64(&s.lastUsed, lu) +} + +func (s *StateImpl) LastUsed() uint64 { + return atomic.LoadUint64(&s.lastUsed) +} diff --git a/pkg/worker/state_test.go b/pkg/worker/state_test.go new file mode 100755 index 00000000..c67182d6 --- /dev/null +++ b/pkg/worker/state_test.go @@ -0,0 +1,27 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_NewState(t *testing.T) { + st := NewWorkerState(StateErrored) + + assert.Equal(t, "errored", st.String()) + + assert.Equal(t, "inactive", NewWorkerState(StateInactive).String()) + assert.Equal(t, "ready", NewWorkerState(StateReady).String()) + assert.Equal(t, "working", NewWorkerState(StateWorking).String()) + assert.Equal(t, "stopped", NewWorkerState(StateStopped).String()) + assert.Equal(t, "undefined", NewWorkerState(1000).String()) +} + +func Test_IsActive(t *testing.T) { + assert.False(t, NewWorkerState(StateInactive).IsActive()) + assert.True(t, NewWorkerState(StateReady).IsActive()) + assert.True(t, NewWorkerState(StateWorking).IsActive()) + assert.False(t, NewWorkerState(StateStopped).IsActive()) + assert.False(t, NewWorkerState(StateErrored).IsActive()) +} diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 010af076..8ed57ac2 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -8,9 +8,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -48,25 +46,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -91,7 +89,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != states.StateReady { + if tw.process.State().Value() != StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -101,13 +99,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(states.StateWorking) + tw.process.State().Set(StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(states.StateErrored) + tw.process.State().Set(StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -117,7 +115,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(states.StateReady) + tw.process.State().Set(StateReady) tw.process.State().RegisterExec() c <- wexec{ @@ -214,7 +212,7 @@ func (tw *SyncWorkerImpl) Created() time.Time { return tw.process.Created() } -func (tw *SyncWorkerImpl) State() internal.State { +func (tw *SyncWorkerImpl) State() State { return tw.process.State() } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index b726c6f1..f7e8008f 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -39,7 +38,7 @@ type Process struct { // number of Process executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. - state *internal.WorkerState + state *StateImpl // underlying command with associated process, command must be // provided to Process from outside in non-started form. CmdSource @@ -67,7 +66,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(states.StateInactive), + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -106,7 +105,7 @@ func (w *Process) addListener(listener events.Listener) { // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. -func (w *Process) State() internal.State { +func (w *Process) State() State { return w.state } @@ -157,13 +156,13 @@ func (w *Process) Wait() error { err = w.cmd.Wait() // If worker was destroyed, just exit - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { return nil } // If state is different, and err is not nil, append it to the errors if err != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) err = multierr.Combine(err, errors.E(op, err)) } @@ -173,12 +172,12 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error err2 := w.closeRelay() if err2 != nil { - w.State().Set(states.StateErrored) + w.State().Set(StateErrored) return multierr.Append(err, errors.E(op, err2)) } if w.cmd.ProcessState.Success() { - w.State().Set(states.StateStopped) + w.State().Set(StateStopped) return nil } @@ -198,20 +197,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(states.StateStopping) + w.state.Set(StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(states.StateKilling) + w.state.Set(StateKilling) return multierr.Append(err, w.cmd.Process.Kill()) } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -219,12 +218,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(states.StateKilling) + w.state.Set(StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(states.StateStopped) + w.state.Set(StateStopped) return nil } diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go index 927aa270..ce5011c0 100644 --- a/pkg/worker_watcher/interface.go +++ b/pkg/worker_watcher/interface.go @@ -6,25 +6,26 @@ import ( "github.com/spiral/roadrunner/v2/pkg/worker" ) +// Watcher is an interface for the Sync workers lifecycle type Watcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(workers []worker.SyncWorker) error + // Watch used to add workers to the stack + Watch(workers []worker.SyncWorker) error - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) + // Get provide first free worker + Get(ctx context.Context) (worker.SyncWorker, error) - // PutWorker enqueues worker back - PushWorker(w worker.SyncWorker) + // Push enqueues worker back + Push(w worker.SyncWorker) - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew() error + // Allocate - allocates new worker and put it into the WorkerWatcher + Allocate() error // Destroy destroys the underlying stack Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage - WorkersList() []worker.SyncWorker + List() []worker.SyncWorker // RemoveWorker remove worker from the stack - RemoveWorker(wb worker.SyncWorker) error + Remove(wb worker.SyncWorker) error } diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index 55034e41..51c3d016 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -129,7 +128,7 @@ func (stack *Stack) Destroy(ctx context.Context) { stack.mutex.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(states.StateDestroyed) + stack.workers[i].State().Set(worker.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index cf2e1eb7..6b9e9dbf 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -6,7 +6,6 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -28,7 +27,7 @@ type workerWatcher struct { events events.Handler } -func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { +func (ww *workerWatcher) Watch(workers []worker.SyncWorker) error { for i := 0; i < len(workers); i++ { ww.stack.Push(workers[i]) @@ -39,7 +38,7 @@ func (ww *workerWatcher) AddToWatch(workers []worker.SyncWorker) error { return nil } -func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, error) { +func (ww *workerWatcher) Get(ctx context.Context) (worker.SyncWorker, error) { const op = errors.Op("worker_watcher_get_free_worker") // thread safe operation w, stop := ww.stack.Pop() @@ -49,14 +48,25 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == states.StateRemove { - err := ww.RemoveWorker(w) + if w != nil && w.State().Value() == worker.StateRemove { + err := ww.Remove(w) if err != nil { return nil, err } // try to get next - return ww.GetFreeWorker(ctx) + return ww.Get(ctx) } + + // if worker not in the ready state it possibly corrupted + if w != nil && w.State().Value() != worker.StateReady { + err := ww.Remove(w) + if err != nil { + return nil, err + } + // try to get next + return ww.Get(ctx) + } + // no free stack if w == nil { for { @@ -79,7 +89,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, return w, nil } -func (ww *workerWatcher) AllocateNew() error { +func (ww *workerWatcher) Allocate() error { ww.stack.mutex.Lock() const op = errors.Op("worker_watcher_allocate_new") sw, err := ww.allocator() @@ -89,12 +99,12 @@ func (ww *workerWatcher) AllocateNew() error { ww.addToWatch(sw) ww.stack.mutex.Unlock() - ww.PushWorker(sw) + ww.Push(sw) return nil } -func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { +func (ww *workerWatcher) Remove(wb worker.SyncWorker) error { ww.mutex.Lock() defer ww.mutex.Unlock() @@ -102,7 +112,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(states.StateRemove) + wb.State().Set(worker.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -114,7 +124,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { } // O(1) operation -func (ww *workerWatcher) PushWorker(w worker.SyncWorker) { +func (ww *workerWatcher) Push(w worker.SyncWorker) { ww.mutex.Lock() defer ww.mutex.Unlock() ww.stack.Push(w) @@ -127,7 +137,7 @@ func (ww *workerWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation, and it will return copy of the actual workers -func (ww *workerWatcher) WorkersList() []worker.SyncWorker { +func (ww *workerWatcher) List() []worker.SyncWorker { return ww.stack.Workers() } @@ -142,14 +152,14 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == states.StateDestroyed { + if w.State().Value() == worker.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return } _ = ww.stack.FindAndRemoveByPid(w.Pid()) - err = ww.AllocateNew() + err = ww.Allocate() if err != nil { ww.events.Push(events.PoolEvent{ Event: events.EventPoolError, |