summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 16:46:32 +0300
committerValery Piashchynski <[email protected]>2021-07-14 16:46:32 +0300
commit4151bbffe7b3ab882de5f7ac29f41c974679f087 (patch)
treec29840fe2b0e530c069f47ec956b606cd8ff6b1d /pkg/pool
parent9d018f259b45be9268ae85e089a07f25de894f41 (diff)
Fix TTL issue, added explanation comments.
The worker after it executed the request, may overwrite the TTL state. This inconsistency leads to the +1 worker in the FIFO channel. In this state, the Push operation was blocked. Add RR_BROADCAST_PATH. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/interface.go24
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go39
-rw-r--r--pkg/pool/supervisor_test.go54
4 files changed, 112 insertions, 7 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go
index c22fbbd3..bbf7653e 100644
--- a/pkg/pool/interface.go
+++ b/pkg/pool/interface.go
@@ -27,3 +27,27 @@ type Pool interface {
// ExecWithContext executes task with context which is used with timeout
execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
}
+
+// Watcher is an interface for the Sync workers lifecycle
+type Watcher interface {
+ // Watch used to add workers to the container
+ Watch(workers []worker.BaseProcess) error
+
+ // Get provide first free worker
+ Get(ctx context.Context) (worker.BaseProcess, error)
+
+ // Push enqueues worker back
+ Push(w worker.BaseProcess)
+
+ // Allocate - allocates new worker and put it into the WorkerWatcher
+ Allocate() error
+
+ // Destroy destroys the underlying container
+ Destroy(ctx context.Context)
+
+ // List return all container w/o removing it from internal storage
+ List() []worker.BaseProcess
+
+ // Remove will remove worker from the container
+ Remove(wb worker.BaseProcess)
+}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e568661f..5a6247b5 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -41,7 +41,7 @@ type StaticPool struct {
listeners []events.Listener
// manages worker states and TTLs
- ww workerWatcher.Watcher
+ ww Watcher
// allocate new worker
allocator worker.Allocator
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index b09b6f6c..4b990dbe 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
- worker.StateStopping:
+ worker.StateStopping,
+ worker.StateKilling:
continue
}
@@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
- // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 513d369f..1cd301ba 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfgSupervised = Config{
@@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+ pid = p.Workers()[0].Pid()
+
+ resp, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+
+ p.Destroy(context.Background())
+}
+
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: uint64(1),