summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/interface.go24
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go39
-rw-r--r--pkg/pool/supervisor_test.go54
-rwxr-xr-xpkg/worker/sync_worker.go7
-rw-r--r--pkg/worker_watcher/container/vec.go2
-rw-r--r--pkg/worker_watcher/interface.go31
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go21
8 files changed, 136 insertions, 44 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index c22fbbd3..bbf7653e 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}
+
+// Watcher is an interface for the Sync workers lifecycle
+type Watcher interface {
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
+
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
+
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
+
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
+
+ // Destroy destroys the underlying container
+ Destroy(ctx context.Context)
+
+ // List return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
+
+ // Remove will remove worker from the container
+ Remove(wb worker.BaseProcess)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e568661f..5a6247b5 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww workerWatcher.Watcher
+ ww Watcher
// allocate new worker
allocator worker.Allocator
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index b09b6f6c..4b990dbe 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
- worker.StateStopping:
+ worker.StateStopping,
+ worker.StateKilling:
continue
}
@@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
- // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 513d369f..1cd301ba 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfgSupervised = Config{
@@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+ pid = p.Workers()[0].Pid()
+
+ resp, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+
+ p.Destroy(context.Background())
+}
+
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 84ff5977..02f11d0b 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -60,6 +60,13 @@ func (tw *SyncWorkerImpl) Exec(p payload.Payload) (payload.Payload, error) {
return payload.Payload{}, errors.E(op, err)
}
+ // supervisor may set state of the worker during the work
+ // in this case we should not re-write the worker state
+ if tw.process.State().Value() != StateWorking {
+ tw.process.State().RegisterExec()
+ return rsp, nil
+ }
+
tw.process.State().Set(StateReady)
tw.process.State().RegisterExec()
diff --git a/pkg/worker_watcher/container/vec.go b/pkg/worker_watcher/container/vec.go
index b9150c43..24b5fa6d 100644
--- a/pkg/worker_watcher/container/vec.go
+++ b/pkg/worker_watcher/container/vec.go
@@ -13,7 +13,7 @@ type Vec struct {
workers chan worker.BaseProcess
}
-func NewVector(initialNumOfWorkers uint64) Vector {
+func NewVector(initialNumOfWorkers uint64) *Vec {
vec := &Vec{
destroy: 0,
workers: make(chan worker.BaseProcess, initialNumOfWorkers),
diff --git a/pkg/worker_watcher/interface.go b/pkg/worker_watcher/interface.go
deleted file mode 100644
index 29fa3640..00000000
--- a/pkg/worker_watcher/interface.go
+++ /dev/null
@@ -1,31 +0,0 @@
-package worker_watcher //nolint:stylecheck
-
-import (
- "context"
-
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Watcher is an interface for the Sync workers lifecycle
-type Watcher interface {
- // Watch used to add workers to the container
- Watch(workers []worker.BaseProcess) error
-
- // Get provide first free worker
- Get(ctx context.Context) (worker.BaseProcess, error)
-
- // Push enqueues worker back
- Push(w worker.BaseProcess)
-
- // Allocate - allocates new worker and put it into the WorkerWatcher
- Allocate() error
-
- // Destroy destroys the underlying container
- Destroy(ctx context.Context)
-
- // List return all container w/o removing it from internal storage
- List() []worker.BaseProcess
-
- // Remove will remove worker from the container
- Remove(wb worker.BaseProcess)
-}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index f82de958..b2d61d48 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,8 +11,18 @@ import (
"github.com/spiral/roadrunner/v2/pkg/worker_watcher/container"
)
+// Vector interface represents vector container
+type Vector interface {
+ // Enqueue used to put worker to the vector
+ Enqueue(worker.BaseProcess)
+ // Dequeue used to get worker from the vector
+ Dequeue(ctx context.Context) (worker.BaseProcess, error)
+ // Destroy used to stop releasing the workers
+ Destroy()
+}
+
// NewSyncWorkerWatcher is a constructor for the Watcher
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) *workerWatcher {
ww := &workerWatcher{
container: container.NewVector(numWorkers),
numWorkers: numWorkers,
@@ -26,7 +36,7 @@ func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events
type workerWatcher struct {
sync.RWMutex
- container container.Vector
+ container Vector
// used to control the Destroy stage (that all workers are in the container)
numWorkers uint64
workers []worker.BaseProcess
@@ -150,11 +160,12 @@ func (ww *workerWatcher) Remove(wb worker.BaseProcess) {
// Push O(1) operation
func (ww *workerWatcher) Push(w worker.BaseProcess) {
- if w.State().Value() != worker.StateReady {
+ switch w.State().Value() {
+ case worker.StateReady:
+ ww.container.Enqueue(w)
+ default:
_ = w.Kill()
- return
}
- ww.container.Enqueue(w)
}
// Destroy all underlying container (but let them to complete the task)