diff options
author | Valery Piashchynski <[email protected]> | 2021-01-13 15:30:54 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-01-13 15:30:54 +0300 |
commit | 2a1c6092056c9dc1d725e393da97b72eb65071c4 (patch) | |
tree | 362f0eacdf2373bf208441577c1e69b8337bd71e /pkg | |
parent | f0f2b1aaf8e4df2ab65c6c47d9183f072ac86841 (diff) | |
parent | 2eed81d8fdbf8ee5134bb3b3f4c11c63cf6d757c (diff) |
Merge pull request #473 from spiral/feature/env_variables
feat(env): Add RR system environment variables
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/config.go | 20 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 27 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 41 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 3 |
4 files changed, 74 insertions, 17 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go index 3dcc3584..acdd3d6f 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -12,23 +12,23 @@ type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 + NumWorkers int64 `yaml:"num_workers"` // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs int64 + MaxJobs int64 `yaml:"max_jobs"` // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. - AllocateTimeout time.Duration + AllocateTimeout time.Duration `yaml:"allocate_timeout"` // DestroyTimeout defines for how long pool should be waiting for worker to // properly destroy, if timeout reached worker will be killed. Defaults to 60s. - DestroyTimeout time.Duration + DestroyTimeout time.Duration `yaml:"destroy_timeout"` // Supervision config to limit worker and pool memory usage. - Supervisor *SupervisorConfig + Supervisor *SupervisorConfig `yaml:"supervisor"` } // InitDefaults enables default config values. @@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 + WatchTick uint64 `yaml:"watch_tick"` // TTL defines maximum time worker is allowed to live. - TTL uint64 + TTL uint64 `yaml:"ttl"` // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 + IdleTTL uint64 `yaml:"idle_ttl"` // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 + ExecTTL uint64 `yaml:"exec_ttl"` // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + MaxWorkerMemory uint64 `yaml:"max_worker_memory"` } // InitDefaults enables default config values. diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 378be7dd..07fa7019 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -16,6 +16,9 @@ import ( const MB = 1024 * 1024 +// NSEC_IN_SEC nanoseconds in second +const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck + type Supervised interface { pool.Pool // Start used to start watching process for all pool workers @@ -54,7 +57,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second) defer cancel() go func() { res, err := sp.pool.ExecWithContext(ctx, rqs) @@ -114,7 +117,7 @@ func (sp *supervised) Destroy(ctx context.Context) { func (sp *supervised) Start() { go func() { - watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick)) + watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second) for { select { case <-sp.stopCh: @@ -186,14 +189,28 @@ func (sp *supervised) control() { we are guessing that worker overlap idle time and has to be killed */ + // 1610530005534416045 lu + // lu - now = -7811150814 - nanoseconds + // 7.8 seconds // get last used unix nano lu := workers[i].State().LastUsed() + // worker not used, skip + if lu == 0 { + continue + } - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() + // convert last used to unixNano and sub time.now to seconds + // negative number, because lu always in the past, except for the `back to the future` :) + res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1 // maxWorkerIdle more than diff between now and last used - if sp.cfg.IdleTTL-uint64(res) <= 0 { + // for example: + // After exec worker goes to the rest + // And resting for the 5 seconds + // IdleTTL is 1 second. + // After the control check, res will be 5, idle is 1 + // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. + if int64(sp.cfg.IdleTTL)-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index cb67ebe1..72226bee 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -112,6 +112,47 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { assert.NotEqual(t, pid, p.Workers()[0].Pid()) } +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: int64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1, + TTL: 100, + IdleTTL: 1, + ExecTTL: 100, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ NumWorkers: int64(1), diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 348f0459..127dc801 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -236,7 +236,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { pid := wb.Pid() if ww.stack.FindAndRemoveByPid(pid) { - wb.State().Set(internal.StateInvalid) + wb.State().Set(internal.StateRemove) err := wb.Kill() if err != nil { return errors.E(op, err) @@ -244,7 +244,6 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error { return nil } - wb.State().Set(internal.StateRemove) return nil } |