summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-02 19:17:03 +0300
committerValery Piashchynski <[email protected]>2021-02-02 19:17:03 +0300
commit2bdf7fafa73cabf7cf63657a6b58f2a423ae0fcd (patch)
treefe5efc14a18f78218816a2bc7a2e19ee95642714 /pkg
parent68becf8c58daec426f94513cf444061c199194d7 (diff)
Move worker states out of internal
Diffstat (limited to 'pkg')
-rw-r--r--pkg/events/pool_events.go5
-rwxr-xr-xpkg/pool/static_pool.go8
-rwxr-xr-xpkg/pool/static_pool_test.go6
-rwxr-xr-xpkg/pool/supervisor_pool.go6
-rw-r--r--pkg/states/worker_states.go31
-rwxr-xr-xpkg/transport/pipe/pipe_factory.go5
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go10
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go10
-rwxr-xr-xpkg/transport/socket/socket_factory.go5
-rwxr-xr-xpkg/worker/sync_worker.go17
-rwxr-xr-xpkg/worker/worker.go23
-rw-r--r--pkg/worker_watcher/stack.go4
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go8
13 files changed, 89 insertions, 49 deletions
diff --git a/pkg/events/pool_events.go b/pkg/events/pool_events.go
index 3925df56..c569a5a8 100644
--- a/pkg/events/pool_events.go
+++ b/pkg/events/pool_events.go
@@ -28,6 +28,9 @@ const (
// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
+
+ // EventPoolRestart triggered when pool restart is needed
+ EventPoolRestart
)
type P int64
@@ -52,6 +55,8 @@ func (ev P) String() string {
return "EventIdleTTL"
case EventExecTTL:
return "EventExecTTL"
+ case EventPoolRestart:
+ return "EventPoolRestart"
}
return "Unknown event type"
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 44adf9c0..01b0574d 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport"
"github.com/spiral/roadrunner/v2/pkg/worker"
workerWatcher "github.com/spiral/roadrunner/v2/pkg/worker_watcher"
@@ -195,7 +195,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p
func (sp *StaticPool) stopWorker(w worker.SyncWorker) {
const op = errors.Op("static_pool_stop_worker")
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
err := w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -251,7 +251,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
}
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
@@ -263,7 +263,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return payload.Payload{}, errors.E(op, err)
}
- w.State().Set(internal.StateInvalid)
+ w.State().Set(states.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
errS := w.Stop()
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index a32790e0..2d2b2b7d 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -12,9 +12,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
"github.com/stretchr/testify/assert"
)
@@ -255,7 +255,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
list := p.Workers()
for _, w := range list {
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
}
}
@@ -462,7 +462,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
assert.NotNil(t, p)
for i := range p.Workers() {
- p.Workers()[i].State().Set(internal.StateErrored)
+ p.Workers()[i].State().Set(states.StateErrored)
}
_, err = p.Exec(payload.Payload{Body: []byte("hello")})
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 2597b352..3347ecd4 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -6,9 +6,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/tools"
)
@@ -144,7 +144,7 @@ func (sp *supervised) control() {
workers := sp.pool.Workers()
for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == internal.StateInvalid {
+ if workers[i].State().Value() == states.StateInvalid {
continue
}
@@ -177,7 +177,7 @@ func (sp *supervised) control() {
// firs we check maxWorker idle
if sp.cfg.IdleTTL != 0 {
// then check for the worker state
- if workers[i].State().Value() != internal.StateReady {
+ if workers[i].State().Value() != states.StateReady {
continue
}
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
new file mode 100644
index 00000000..22fdfe8a
--- /dev/null
+++ b/pkg/states/worker_states.go
@@ -0,0 +1,31 @@
+package states
+
+const (
+ // StateInactive - no associated process
+ StateInactive int64 = iota
+
+ // StateReady - ready for job.
+ StateReady
+
+ // StateWorking - working on given payload.
+ StateWorking
+
+ // StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
+ StateInvalid
+
+ // StateStopping - process is being softly stopped.
+ StateStopping
+
+ StateKilling
+
+ // State of worker, when no need to allocate new one
+ StateDestroyed
+
+ // StateStopped - process has been terminated.
+ StateStopped
+
+ // StateErrored - error WorkerState (can't be used).
+ StateErrored
+
+ StateRemove
+)
diff --git a/pkg/transport/pipe/pipe_factory.go b/pkg/transport/pipe/pipe_factory.go
index dd7c5841..929043a7 100755
--- a/pkg/transport/pipe/pipe_factory.go
+++ b/pkg/transport/pipe/pipe_factory.go
@@ -8,6 +8,7 @@ import (
"github.com/spiral/goridge/v3/pkg/pipe"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
)
@@ -92,7 +93,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
// everything ok, set ready state
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
// return worker
c <- SpawnResult{
@@ -152,7 +153,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
// everything ok, set ready state
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
return w, nil
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index d4949c82..e247324c 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -8,9 +8,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -21,13 +21,13 @@ func Test_GetState2(t *testing.T) {
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, internal.StateStopped, w.State().Value())
+ assert.Equal(t, states.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
assert.NoError(t, w.Stop())
}
@@ -40,13 +40,13 @@ func Test_Kill2(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, internal.StateErrored, w.State().Value())
+ assert.Equal(t, states.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index 38166b85..b23af19f 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -9,9 +9,9 @@ import (
"time"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
)
@@ -23,13 +23,13 @@ func Test_GetState(t *testing.T) {
w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
- assert.Equal(t, internal.StateStopped, w.State().Value())
+ assert.Equal(t, states.StateStopped, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
@@ -46,13 +46,13 @@ func Test_Kill(t *testing.T) {
go func() {
defer wg.Done()
assert.Error(t, w.Wait())
- assert.Equal(t, internal.StateErrored, w.State().Value())
+ assert.Equal(t, states.StateErrored, w.State().Value())
}()
assert.NoError(t, err)
assert.NotNil(t, w)
- assert.Equal(t, internal.StateReady, w.State().Value())
+ assert.Equal(t, states.StateReady, w.State().Value())
err = w.Kill()
if err != nil {
t.Errorf("error killing the Process: error %v", err)
diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go
index ccd2b0bf..98bd2389 100755
--- a/pkg/transport/socket/socket_factory.go
+++ b/pkg/transport/socket/socket_factory.go
@@ -13,6 +13,7 @@ import (
"github.com/spiral/goridge/v3/pkg/socket"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
"go.uber.org/multierr"
@@ -124,7 +125,7 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, lis
}
w.AttachRelay(rl)
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
c <- socketSpawn{
w: w,
@@ -167,7 +168,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.Listener) (*wor
}
w.AttachRelay(rl)
- w.State().Set(internal.StateReady)
+ w.State().Set(states.StateReady)
return w, nil
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 1a0393fb..696fbdb7 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -10,6 +10,7 @@ import (
"github.com/spiral/goridge/v3/pkg/frame"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/payload"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -49,25 +50,25 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, errors.Str("payload can not be empty"))
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
return payload.Payload{}, errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String()))
}
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
return payload.Payload{}, err
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
return rsp, nil
@@ -92,7 +93,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- if tw.process.State().Value() != internal.StateReady {
+ if tw.process.State().Value() != states.StateReady {
c <- wexec{
payload: payload.Payload{},
err: errors.E(op, errors.Errorf("Process is not ready (%s)", tw.process.State().String())),
@@ -102,13 +103,13 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
// set last used time
tw.process.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.process.State().Set(internal.StateWorking)
+ tw.process.State().Set(states.StateWorking)
rsp, err := tw.execPayload(p)
if err != nil {
// just to be more verbose
if errors.Is(errors.SoftJob, err) == false {
- tw.process.State().Set(internal.StateErrored)
+ tw.process.State().Set(states.StateErrored)
tw.process.State().RegisterExec()
}
c <- wexec{
@@ -118,7 +119,7 @@ func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload
return
}
- tw.process.State().Set(internal.StateReady)
+ tw.process.State().Set(states.StateReady)
tw.process.State().RegisterExec()
c <- wexec{
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 8fd71cca..2f1f399d 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -15,6 +15,7 @@ import (
"github.com/spiral/goridge/v3/interfaces/relay"
"github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"go.uber.org/multierr"
)
@@ -85,7 +86,7 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
created: time.Now(),
events: events.NewEventsHandler(),
cmd: cmd,
- state: internal.NewWorkerState(internal.StateInactive),
+ state: internal.NewWorkerState(states.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
// sync pool for STDERR
@@ -190,7 +191,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
err := multierr.Combine(w.cmd.Wait())
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
return errors.E(op, err)
}
@@ -199,7 +200,7 @@ func (w *Process) Wait() error {
// and then process.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
@@ -215,12 +216,12 @@ func (w *Process) Wait() error {
err = multierr.Append(err, w.closeRelay())
if err != nil {
- w.state.Set(internal.StateErrored)
+ w.state.Set(states.StateErrored)
return err
}
if w.endState.Success() {
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
}
w.stderr.Reset()
@@ -241,20 +242,20 @@ func (w *Process) closeRelay() error {
// Stop sends soft termination command to the Process and waits for process completion.
func (w *Process) Stop() error {
var err error
- w.state.Set(internal.StateStopping)
+ w.state.Set(states.StateStopping)
err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
if err != nil {
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
return multierr.Append(err, w.cmd.Process.Kill())
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
func (w *Process) Kill() error {
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -262,12 +263,12 @@ func (w *Process) Kill() error {
return nil
}
- w.state.Set(internal.StateKilling)
+ w.state.Set(states.StateKilling)
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
}
- w.state.Set(internal.StateStopped)
+ w.state.Set(states.StateStopped)
return nil
}
diff --git a/pkg/worker_watcher/stack.go b/pkg/worker_watcher/stack.go
index d76f4d8f..55034e41 100644
--- a/pkg/worker_watcher/stack.go
+++ b/pkg/worker_watcher/stack.go
@@ -5,7 +5,7 @@ import (
"sync"
"time"
- "github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -129,7 +129,7 @@ func (stack *Stack) Destroy(ctx context.Context) {
stack.mutex.Lock()
for i := 0; i < len(stack.workers); i++ {
// set state for the stack in the stack (unused at the moment)
- stack.workers[i].State().Set(internal.StateDestroyed)
+ stack.workers[i].State().Set(states.StateDestroyed)
// kill the worker
_ = stack.workers[i].Kill()
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 753b61ee..cf2e1eb7 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -5,8 +5,8 @@ import (
"sync"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/internal"
"github.com/spiral/roadrunner/v2/pkg/events"
+ "github.com/spiral/roadrunner/v2/pkg/states"
"github.com/spiral/roadrunner/v2/pkg/worker"
)
@@ -49,7 +49,7 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (worker.SyncWorker,
// handle worker remove state
// in this state worker is destroyed by supervisor
- if w != nil && w.State().Value() == internal.StateRemove {
+ if w != nil && w.State().Value() == states.StateRemove {
err := ww.RemoveWorker(w)
if err != nil {
return nil, err
@@ -102,7 +102,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.SyncWorker) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(internal.StateRemove)
+ wb.State().Set(states.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -142,7 +142,7 @@ func (ww *workerWatcher) wait(w worker.BaseProcess) {
})
}
- if w.State().Value() == internal.StateDestroyed {
+ if w.State().Value() == states.StateDestroyed {
// worker was manually destroyed, no need to replace
ww.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
return