summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 19:11:29 +0300
committerValery Piashchynski <[email protected]>2021-07-14 19:11:29 +0300
commit3ce3b5a6e0839e31d2cfb3d3b6fa7f9c6ca2e0af (patch)
tree0f975f62cc60b8ab75c92691f08270242c36f311 /pkg/pool
parentcd07985494b3ebb03fd6553bed9aa1393052ffc5 (diff)
parent67db4b5f7b66e9a32713133baed83c3ab7146bb8 (diff)
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
# Conflicts: # pkg/worker_watcher/interface.go
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/supervisor_pool.go39
-rw-r--r--pkg/pool/supervisor_test.go54
2 files changed, 87 insertions, 6 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index b09b6f6c..4b990dbe 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -121,7 +121,8 @@ func (sp *supervised) control() { //nolint:gocognit
worker.StateDestroyed,
worker.StateInactive,
worker.StateStopped,
- worker.StateStopping:
+ worker.StateStopping,
+ worker.StateKilling:
continue
}
@@ -132,23 +133,40 @@ func (sp *supervised) control() { //nolint:gocognit
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
+ // just to double check
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- // SOFT termination. DO NOT STOP active workers
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
- // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
@@ -190,11 +208,20 @@ func (sp *supervised) control() { //nolint:gocognit
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
+ /*
+ worker at this point might be in the middle of request execution:
+
+ ---> REQ ---> WORKER -----------------> RESP (at this point we should not set the Ready state) ------> | ----> Worker gets between supervisor checks and get killed in the ww.Push
+ ^
+ TTL Reached, state - invalid |
+ -----> Worker Stopped here
+ */
+
if workers[i].State().Value() != worker.StateWorking {
workers[i].State().Set(worker.StateInvalid)
_ = workers[i].Stop()
}
-
+ // just to double check
workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index c33aa6fb..a321fdf0 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -9,7 +9,9 @@ import (
"github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/transport/pipe"
+ "github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
var cfgSupervised = &Config{
@@ -122,6 +124,58 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: uint64(1),
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1 * time.Second,
+ TTL: 5 * time.Second,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep-ttl.php") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+ pid = p.Workers()[0].Pid()
+
+ resp, err = p.Exec(payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.NoError(t, err)
+ assert.Equal(t, string(resp.Body), "hello world")
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+ require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady)
+
+ p.Destroy(context.Background())
+}
+
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = &Config{
NumWorkers: uint64(1),