diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 19:11:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 19:11:29 +0300 |
commit | 3ce3b5a6e0839e31d2cfb3d3b6fa7f9c6ca2e0af (patch) | |
tree | 0f975f62cc60b8ab75c92691f08270242c36f311 /pkg/pool | |
parent | cd07985494b3ebb03fd6553bed9aa1393052ffc5 (diff) | |
parent | 67db4b5f7b66e9a32713133baed83c3ab7146bb8 (diff) |
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
# Conflicts:
# pkg/worker_watcher/interface.go
Diffstat (limited to 'pkg/pool')
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 39 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 54 |
2 files changed, 87 insertions, 6 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index b09b6f6c..4b990dbe 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit worker.StateDestroyed, worker.StateInactive, worker.StateStopped, - worker.StateStopping: + worker.StateStopping, + worker.StateKilling: continue } @@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - // SOFT termination. DO NOT STOP active workers + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } + // just to double check + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - // SOFT termination. DO NOT STOP active workers + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - - // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done + // just to double check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue @@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + /* + worker at this point might be in the middle of request execution: + + ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push + ^ + TTL Reached, state - invalid | + -----> Worker Stopped here + */ + if workers[i].State().Value() != worker.StateWorking { workers[i].State().Set(worker.StateInvalid) _ = workers[i].Stop() } - + // just to double check workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index c33aa6fb..a321fdf0 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -9,7 +9,9 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" + "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var cfgSupervised = &Config{ @@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NotEqual(t, pid, p.Workers()[0].Pid()) } +func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + pid = p.Workers()[0].Pid() + + resp, err = p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + + p.Destroy(context.Background()) +} + func TestSupervisedPool_Idle(t *testing.T) { var cfgExecTTL = &Config{ NumWorkers: uint64(1), |