diff options
author | Valery Piashchynski <[email protected]> | 2021-01-13 13:56:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-13 13:56:40 +0300 |
commit | 2eed81d8fdbf8ee5134bb3b3f4c11c63cf6d757c (patch) | |
tree | 362f0eacdf2373bf208441577c1e69b8337bd71e | |
parent | 581bb5370c89d749b77209b841fec30544c16246 (diff) |
Add IdleTTL test
Add removed state to the worker before it being removed
-rw-r--r-- | codecov.yml | 2 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 23 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 41 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 3 |
4 files changed, 63 insertions, 6 deletions
diff --git a/codecov.yml b/codecov.yml index 5243383f..eb499c3a 100644 --- a/codecov.yml +++ b/codecov.yml @@ -10,7 +10,7 @@ coverage: target: auto threshold: 0% informational: true - + # do not include tests folders ignore: - "tests" diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 2e919eae..07fa7019 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -16,6 +16,9 @@ import ( const MB = 1024 * 1024 +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + type Supervised interface { pool.Pool // Start used to start watching process for all pool workers @@ -186,14 +189,28 @@ func (sp *supervised) control() { we are guessing that worker overlap idle time and has to be killed */ + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds // get last used unix nano lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 // maxWorkerIdle more than diff between now and last used - if sp.cfg.IdleTTL-uint64(res) <= 0 { + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL)-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index cb67ebe1..72226bee 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -112,6 +112,47 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NotEqual(t, pid, p.Workers()[0].Pid()) } +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 1, + ExecTTL: 100, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: int64(1), diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348f0459..127dc801 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -236,7 +236,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(internal.StateInvalid) + wb.State().Set(internal.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -244,7 +244,6 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { return nil } - wb.State().Set(internal.StateRemove) return nil } |