summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-13 15:30:54 +0300
committerGitHub <[email protected]>2021-01-13 15:30:54 +0300
commit2a1c6092056c9dc1d725e393da97b72eb65071c4 (patch)
tree362f0eacdf2373bf208441577c1e69b8337bd71e /pkg
parentf0f2b1aaf8e4df2ab65c6c47d9183f072ac86841 (diff)
parent2eed81d8fdbf8ee5134bb3b3f4c11c63cf6d757c (diff)
Merge pull request #473 from spiral/feature/env_variables
feat(env): Add RR system environment variables
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/config.go20
-rwxr-xr-xpkg/pool/supervisor_pool.go27
-rw-r--r--pkg/pool/supervisor_test.go41
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go3
4 files changed, 74 insertions, 17 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 3dcc3584..acdd3d6f 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -12,23 +12,23 @@ type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64
+ NumWorkers int64 `yaml:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs int64
+ MaxJobs int64 `yaml:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
- AllocateTimeout time.Duration
+ AllocateTimeout time.Duration `yaml:"allocate_timeout"`
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
- DestroyTimeout time.Duration
+ DestroyTimeout time.Duration `yaml:"destroy_timeout"`
// Supervision config to limit worker and pool memory usage.
- Supervisor *SupervisorConfig
+ Supervisor *SupervisorConfig `yaml:"supervisor"`
}
// InitDefaults enables default config values.
@@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64
+ WatchTick uint64 `yaml:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
- TTL uint64
+ TTL uint64 `yaml:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64
+ IdleTTL uint64 `yaml:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64
+ ExecTTL uint64 `yaml:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ MaxWorkerMemory uint64 `yaml:"max_worker_memory"`
}
// InitDefaults enables default config values.
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 378be7dd..07fa7019 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -16,6 +16,9 @@ import (
const MB = 1024 * 1024
+// NSEC_IN_SEC nanoseconds in second
+const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+
type Supervised interface {
pool.Pool
// Start used to start watching process for all pool workers
@@ -54,7 +57,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second)
defer cancel()
go func() {
res, err := sp.pool.ExecWithContext(ctx, rqs)
@@ -114,7 +117,7 @@ func (sp *supervised) Destroy(ctx context.Context) {
func (sp *supervised) Start() {
go func() {
- watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
+ watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second)
for {
select {
case <-sp.stopCh:
@@ -186,14 +189,28 @@ func (sp *supervised) control() {
we are guessing that worker overlap idle time and has to be killed
*/
+ // 1610530005534416045 lu
+ // lu - now = -7811150814 - nanoseconds
+ // 7.8 seconds
// get last used unix nano
lu := workers[i].State().LastUsed()
+ // worker not used, skip
+ if lu == 0 {
+ continue
+ }
- // convert last used to unixNano and sub time.now
- res := int64(lu) - now.UnixNano()
+ // convert last used to unixNano and sub time.now to seconds
+ // negative number, because lu always in the past, except for the `back to the future` :)
+ res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
// maxWorkerIdle more than diff between now and last used
- if sp.cfg.IdleTTL-uint64(res) <= 0 {
+ // for example:
+ // After exec worker goes to the rest
+ // And resting for the 5 seconds
+ // IdleTTL is 1 second.
+ // After the control check, res will be 5, idle is 1
+ // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
+ if int64(sp.cfg.IdleTTL)-res <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index cb67ebe1..72226bee 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -112,6 +112,47 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_Idle(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 1,
+ ExecTTL: 100,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Nil(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 5)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: int64(1),
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348f0459..127dc801 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -236,7 +236,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(internal.StateInvalid)
+ wb.State().Set(internal.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -244,7 +244,6 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
return nil
}
- wb.State().Set(internal.StateRemove)
return nil
}