summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-25 14:50:21 +0300
committerValery Piashchynski <[email protected]>2021-01-25 14:50:21 +0300
commitbb9e34db0f96295c5c2104262f43a3ab0edbc060 (patch)
treea9b0b99a36b796fdeaac130c9330de10aa4d5c0e /pkg
parent709f7223fca5e60793ad9b3192f92a554854d6ee (diff)
Add new Supervisor test in the http plugin
Uniform supervisor config keys to use same notation as pool (10s, 10h not just 10)
Diffstat (limited to 'pkg')
-rw-r--r--pkg/pool/config.go18
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go8
-rw-r--r--pkg/pool/supervisor_test.go50
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go6
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go4
8 files changed, 50 insertions, 50 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index cf4aaaee..782f7ce9 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -12,12 +12,12 @@ type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64 `mapstructure:"num_workers"`
+ NumWorkers uint64 `mapstructure:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs int64 `mapstructure:"max_jobs"`
+ MaxJobs uint64 `mapstructure:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
@@ -34,7 +34,7 @@ type Config struct {
// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
- cfg.NumWorkers = int64(runtime.NumCPU())
+ cfg.NumWorkers = uint64(runtime.NumCPU())
}
if cfg.AllocateTimeout == 0 {
@@ -52,24 +52,24 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64
+ WatchTick time.Duration `mapstructure:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
- TTL uint64
+ TTL time.Duration `mapstructure:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}
// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
if cfg.WatchTick == 0 {
- cfg.WatchTick = 1
+ cfg.WatchTick = time.Second
}
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 7f66eaac..44adf9c0 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -310,12 +310,12 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
var workers []worker.SyncWorker
// constant number of stack simplify logic
- for i := int64(0); i < numWorkers; i++ {
+ for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
return nil, errors.E(op, errors.WorkerAllocate, err)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index a877b28f..a32790e0 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
)
var cfg = Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}
@@ -596,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 583d05b4..2597b352 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -56,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second)
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
defer cancel()
go func() {
res, err := sp.pool.ExecWithContext(ctx, rqs)
@@ -116,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) {
func (sp *supervised) Start() {
go func() {
- watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second)
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
for {
select {
case <-sp.stopCh:
@@ -154,7 +154,7 @@ func (sp *supervised) control() {
continue
}
- if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
@@ -209,7 +209,7 @@ func (sp *supervised) control() {
// IdleTTL is 1 second.
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
- if int64(sp.cfg.IdleTTL)-res <= 0 {
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 73ab4927..c67d5d91 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -14,14 +14,14 @@ import (
)
var cfgSupervised = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -74,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) {
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 1,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -115,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 1,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -156,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) {
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 4,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -198,14 +198,14 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 4,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
MaxWorkerMemory: 1,
},
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 2e5bbcd5..d4949c82 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -440,17 +440,17 @@ func Test_NumExecs2(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(1), w.State().NumExecs())
+ assert.Equal(t, uint64(1), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(2), w.State().NumExecs())
+ assert.Equal(t, uint64(2), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(3), w.State().NumExecs())
+ assert.Equal(t, uint64(3), w.State().NumExecs())
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index fa37ac0f..38166b85 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -462,17 +462,17 @@ func Test_NumExecs(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(1), w.State().NumExecs())
+ assert.Equal(t, uint64(1), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(2), w.State().NumExecs())
+ assert.Equal(t, uint64(2), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(3), w.State().NumExecs())
+ assert.Equal(t, uint64(3), w.State().NumExecs())
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 2c3d512d..753b61ee 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,9 +11,9 @@ import (
)
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(uint64(numWorkers)),
+ stack: NewWorkersStack(numWorkers),
allocator: allocator,
events: events,
}