diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 14:50:21 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-25 14:50:21 +0300 |
commit | bb9e34db0f96295c5c2104262f43a3ab0edbc060 (patch) | |
tree | a9b0b99a36b796fdeaac130c9330de10aa4d5c0e /pkg | |
parent | 709f7223fca5e60793ad9b3192f92a554854d6ee (diff) |
Add new Supervisor test in the http plugin
Uniform supervisor config keys to use same notation as pool (10s, 10h
not just 10)
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/config.go | 18 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 4 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 4 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 8 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 50 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 6 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 6 | ||||
-rwxr-xr-x | pkg/worker_watcher/worker_watcher.go | 4 |
8 files changed, 50 insertions, 50 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go index cf4aaaee..782f7ce9 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -12,12 +12,12 @@ type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 `mapstructure:"num_workers"` + NumWorkers uint64 `mapstructure:"num_workers"` // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs int64 `mapstructure:"max_jobs"` + MaxJobs uint64 `mapstructure:"max_jobs"` // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. @@ -34,7 +34,7 @@ type Config struct { // InitDefaults enables default config values. func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { - cfg.NumWorkers = int64(runtime.NumCPU()) + cfg.NumWorkers = uint64(runtime.NumCPU()) } if cfg.AllocateTimeout == 0 { @@ -52,24 +52,24 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 + WatchTick time.Duration `mapstructure:"watch_tick"` // TTL defines maximum time worker is allowed to live. - TTL uint64 + TTL time.Duration `mapstructure:"ttl"` // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 + IdleTTL time.Duration `mapstructure:"idle_ttl"` // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 + ExecTTL time.Duration `mapstructure:"exec_ttl"` // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` } // InitDefaults enables default config values. func (cfg *SupervisorConfig) InitDefaults() { if cfg.WatchTick == 0 { - cfg.WatchTick = 1 + cfg.WatchTick = time.Second } } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 7f66eaac..44adf9c0 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -310,12 +310,12 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) { +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { const op = errors.Op("allocate workers") var workers []worker.SyncWorker // constant number of stack simplify logic - for i := int64(0); i < numWorkers; i++ { + for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { return nil, errors.E(op, errors.WorkerAllocate, err) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index a877b28f..a32790e0 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( ) var cfg = Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, } @@ -596,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 583d05b4..2597b352 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -56,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { res, err := sp.pool.ExecWithContext(ctx, rqs) @@ -116,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) { func (sp *supervised) Start() { go func() { - watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second) + watchTout := time.NewTicker(sp.cfg.WatchTick) for { select { case <-sp.stopCh: @@ -154,7 +154,7 @@ func (sp *supervised) control() { continue } - if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) @@ -209,7 +209,7 @@ func (sp *supervised) control() { // IdleTTL is 1 second. // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. - if int64(sp.cfg.IdleTTL)-res <= 0 { + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 73ab4927..c67d5d91 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -14,14 +14,14 @@ import ( ) var cfgSupervised = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -74,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) { func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 1, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, MaxWorkerMemory: 100, }, } @@ -115,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { func TestSupervisedPool_Idle(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 1, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -156,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) { func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 4, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, MaxWorkerMemory: 100, }, } @@ -198,14 +198,14 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { func TestSupervisedPool_MaxMemoryReached(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 4, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, MaxWorkerMemory: 1, }, } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 2e5bbcd5..d4949c82 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -440,17 +440,17 @@ func Test_NumExecs2(t *testing.T) { if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index fa37ac0f..38166b85 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -462,17 +462,17 @@ func Test_NumExecs(t *testing.T) { if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 2c3d512d..753b61ee 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -11,9 +11,9 @@ import ( ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(uint64(numWorkers)), + stack: NewWorkersStack(numWorkers), allocator: allocator, events: events, } |