From 2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 2 Feb 2021 19:17:03 +0300 Subject: Move worker states out of internal --- pkg/events/pool_events.go | 5 +++++ pkg/pool/static_pool.go | 8 +++---- pkg/pool/static_pool_test.go | 6 +++--- pkg/pool/supervisor_pool.go | 6 +++--- pkg/states/worker_states.go | 31 +++++++++++++++++++++++++++ pkg/transport/pipe/pipe_factory.go | 5 +++-- pkg/transport/pipe/pipe_factory_spawn_test.go | 10 ++++----- pkg/transport/pipe/pipe_factory_test.go | 10 ++++----- pkg/transport/socket/socket_factory.go | 5 +++-- pkg/worker/sync_worker.go | 17 ++++++++------- pkg/worker/worker.go | 23 ++++++++++---------- pkg/worker_watcher/stack.go | 4 ++-- pkg/worker_watcher/worker_watcher.go | 8 +++---- 13 files changed, 89 insertions(+), 49 deletions(-) create mode 100644 pkg/states/worker_states.go (limited to 'pkg') diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go index 3925df56..c569a5a8 100644 --- a/pkg/events/pool_events.go +++ b/pkg/events/pool_events.go @@ -28,6 +28,9 @@ const ( // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL + + // EventPoolRestart triggered when pool restart is needed + EventPoolRestart ) type P int64 @@ -52,6 +55,8 @@ func (ev P) String() string { return "EventIdleTTL" case EventExecTTL: return "EventExecTTL" + case EventPoolRestart: + return "EventPoolRestart" } return "Unknown event type" } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 44adf9c0..01b0574d 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport" "github.com/spiral/roadrunner/v2/pkg/worker" workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher" @@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p func (sp *StaticPool) stopWorker(w worker.SyncWorker) { const op = errors.Op("static_pool_stop_worker") - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) err := w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -251,7 +251,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) } - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) @@ -263,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return payload.Payload{}, errors.E(op, err) } - w.State().Set(internal.StateInvalid) + w.State().Set(states.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) errS := w.Stop() diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index a32790e0..2d2b2b7d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -12,9 +12,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" "github.com/stretchr/testify/assert" ) @@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { list := p.Workers() for _, w := range list { - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) } } @@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { assert.NotNil(t, p) for i := range p.Workers() { - p.Workers()[i].State().Set(internal.StateErrored) + p.Workers()[i].State().Set(states.StateErrored) } _, err = p.Exec(payload.Payload{Body: []byte("hello")}) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 2597b352..3347ecd4 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -6,9 +6,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/tools" ) @@ -144,7 +144,7 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == internal.StateInvalid { + if workers[i].State().Value() == states.StateInvalid { continue } @@ -177,7 +177,7 @@ func (sp *supervised) control() { // firs we check maxWorker idle if sp.cfg.IdleTTL != 0 { // then check for the worker state - if workers[i].State().Value() != internal.StateReady { + if workers[i].State().Value() != states.StateReady { continue } diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go new file mode 100644 index 00000000..22fdfe8a --- /dev/null +++ b/pkg/states/worker_states.go @@ -0,0 +1,31 @@ +package states + +const ( + // StateInactive - no associated process + StateInactive int64 = iota + + // StateReady - ready for job. + StateReady + + // StateWorking - working on given payload. + StateWorking + + // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. + StateInvalid + + // StateStopping - process is being softly stopped. + StateStopping + + StateKilling + + // State of worker, when no need to allocate new one + StateDestroyed + + // StateStopped - process has been terminated. + StateStopped + + // StateErrored - error WorkerState (can't be used). + StateErrored + + StateRemove +) diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go index dd7c5841..929043a7 100755 --- a/pkg/transport/pipe/pipe_factory.go +++ b/pkg/transport/pipe/pipe_factory.go @@ -8,6 +8,7 @@ import ( "github.com/spiral/goridge/v3/pkg/pipe" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" ) @@ -92,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } // everything ok, set ready state - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) // return worker c <- SpawnResult{ @@ -152,7 +153,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } // everything ok, set ready state - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) return w, nil } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index d4949c82..e247324c 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -8,9 +8,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -21,13 +21,13 @@ func Test_GetState2(t *testing.T) { w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) + assert.Equal(t, states.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) assert.NoError(t, w.Stop()) } @@ -40,13 +40,13 @@ func Test_Kill2(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) + assert.Equal(t, states.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index 38166b85..b23af19f 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -9,9 +9,9 @@ import ( "time" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" ) @@ -23,13 +23,13 @@ func Test_GetState(t *testing.T) { w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) - assert.Equal(t, internal.StateStopped, w.State().Value()) + assert.Equal(t, states.StateStopped, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) @@ -46,13 +46,13 @@ func Test_Kill(t *testing.T) { go func() { defer wg.Done() assert.Error(t, w.Wait()) - assert.Equal(t, internal.StateErrored, w.State().Value()) + assert.Equal(t, states.StateErrored, w.State().Value()) }() assert.NoError(t, err) assert.NotNil(t, w) - assert.Equal(t, internal.StateReady, w.State().Value()) + assert.Equal(t, states.StateReady, w.State().Value()) err = w.Kill() if err != nil { t.Errorf("error killing the Process: error %v", err) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index ccd2b0bf..98bd2389 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -13,6 +13,7 @@ import ( "github.com/spiral/goridge/v3/pkg/socket" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" "go.uber.org/multierr" @@ -124,7 +125,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis } w.AttachRelay(rl) - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) c <- socketSpawn{ w: w, @@ -167,7 +168,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor } w.AttachRelay(rl) - w.State().Set(internal.StateReady) + w.State().Set(states.StateReady) return w, nil } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 1a0393fb..696fbdb7 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -10,6 +10,7 @@ import ( "github.com/spiral/goridge/v3/pkg/frame" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/payload" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) { return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty")) } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())) } // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } return payload.Payload{}, err } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() return rsp, nil @@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - if tw.process.State().Value() != internal.StateReady { + if tw.process.State().Value() != states.StateReady { c <- wexec{ payload: payload.Payload{}, err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())), @@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload // set last used time tw.process.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.process.State().Set(internal.StateWorking) + tw.process.State().Set(states.StateWorking) rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose if errors.Is(errors.SoftJob, err) == false { - tw.process.State().Set(internal.StateErrored) + tw.process.State().Set(states.StateErrored) tw.process.State().RegisterExec() } c <- wexec{ @@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload return } - tw.process.State().Set(internal.StateReady) + tw.process.State().Set(states.StateReady) tw.process.State().RegisterExec() c <- wexec{ diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 8fd71cca..2f1f399d 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -15,6 +15,7 @@ import ( "github.com/spiral/goridge/v3/interfaces/relay" "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "go.uber.org/multierr" ) @@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { created: time.Now(), events: events.NewEventsHandler(), cmd: cmd, - state: internal.NewWorkerState(internal.StateInactive), + state: internal.NewWorkerState(states.StateInactive), stderr: new(bytes.Buffer), stop: make(chan struct{}, 1), // sync pool for STDERR @@ -190,7 +191,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") err := multierr.Combine(w.cmd.Wait()) - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { return errors.E(op, err) } @@ -199,7 +200,7 @@ func (w *Process) Wait() error { // and then process.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) @@ -215,12 +216,12 @@ func (w *Process) Wait() error { err = multierr.Append(err, w.closeRelay()) if err != nil { - w.state.Set(internal.StateErrored) + w.state.Set(states.StateErrored) return err } if w.endState.Success() { - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) } w.stderr.Reset() @@ -241,20 +242,20 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { var err error - w.state.Set(internal.StateStopping) + w.state.Set(states.StateStopping) err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) if err != nil { - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) return multierr.Append(err, w.cmd.Process.Kill()) } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! func (w *Process) Kill() error { - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -262,12 +263,12 @@ func (w *Process) Kill() error { return nil } - w.state.Set(internal.StateKilling) + w.state.Set(states.StateKilling) err := w.cmd.Process.Signal(os.Kill) if err != nil { return err } - w.state.Set(internal.StateStopped) + w.state.Set(states.StateStopped) return nil } diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go index d76f4d8f..55034e41 100644 --- a/pkg/worker_watcher/stack.go +++ b/pkg/worker_watcher/stack.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -129,7 +129,7 @@ func (stack *Stack) Destroy(ctx context.Context) { stack.mutex.Lock() for i := 0; i < len(stack.workers); i++ { // set state for the stack in the stack (unused at the moment) - stack.workers[i].State().Set(internal.StateDestroyed) + stack.workers[i].State().Set(states.StateDestroyed) // kill the worker _ = stack.workers[i].Kill() } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 753b61ee..cf2e1eb7 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -5,8 +5,8 @@ import ( "sync" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/internal" "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/pkg/states" "github.com/spiral/roadrunner/v2/pkg/worker" ) @@ -49,7 +49,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker, // handle worker remove state // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == internal.StateRemove { + if w != nil && w.State().Value() == states.StateRemove { err := ww.RemoveWorker(w) if err != nil { return nil, err @@ -102,7 +102,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(internal.StateRemove) + wb.State().Set(states.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -142,7 +142,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) { }) } - if w.State().Value() == internal.StateDestroyed { + if w.State().Value() == states.StateDestroyed { // worker was manually destroyed, no need to replace ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) return -- cgit v1.2.3