diff options
author | Valery Piashchynski <[email protected]> | 2021-01-25 14:50:21 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-25 14:50:21 +0300 |
commit | bb9e34db0f96295c5c2104262f43a3ab0edbc060 (patch) | |
tree | a9b0b99a36b796fdeaac130c9330de10aa4d5c0e | |
parent | 709f7223fca5e60793ad9b3192f92a554854d6ee (diff) |
Add new Supervisor test in the http plugin
Uniform supervisor config keys to use same notation as pool (10s, 10h
not just 10)
24 files changed, 240 insertions, 175 deletions
@@ -9,7 +9,7 @@ server: "RR_HTTP": "true" "RR_RPC": "tcp://127.0.0.1:6001" relay: "pipes" - relay_timeout: "20s" + relay_timeout: 20s logs: mode: development @@ -44,7 +44,6 @@ http: input: "custom-header" response: output: "output-header" - static: dir: "tests" forbid: [ "" ] @@ -59,15 +58,15 @@ http: allocate_timeout: 60s destroy_timeout: 60s supervisor: - # WatchTick defines how often to check the state of worker (seconds) - watch_tick: 1 - # TTL defines maximum time worker is allowed to live (seconds) + # watch_tick defines how often to check the state of the workers (seconds) + watch_tick: 1s + # ttl defines maximum time worker is allowed to live (seconds) ttl: 0 - # IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0 (seconds) - idle_ttl: 10 - # ExecTTL defines maximum lifetime per job (seconds) - exec_ttl: 10 - # MaxWorkerMemory limits memory per worker (MB) + # idle_ttl defines maximum duration worker can spend in idle mode after first use. Disabled when 0 (seconds) + idle_ttl: 10s + # exec_ttl defines maximum lifetime per job (seconds) + exec_ttl: 10s + # max_worker_memory limits memory usage per worker (MB) max_worker_memory: 100 ssl: @@ -76,7 +75,7 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - rootCa: root.crt + root_ca: root.crt fcgi: address: tcp://0.0.0.0:7921 http2: diff --git a/internal/state.go b/internal/state.go index 8f7d939b..a14a6937 100755 --- a/internal/state.go +++ b/internal/state.go @@ -13,7 +13,7 @@ type State interface { // Set sets the WorkerState Set(value int64) // NumJobs shows how many times WorkerProcess was invoked - NumExecs() int64 + NumExecs() uint64 // IsActive returns true if WorkerProcess not Inactive or Stopped IsActive() bool // RegisterExec using to registering php executions @@ -56,7 +56,7 @@ const ( type WorkerState struct { value int64 - numExecs int64 + numExecs uint64 // to be lightweight, use UnixNano lastUsed uint64 } @@ -87,8 +87,8 @@ func (s *WorkerState) String() string { } // NumExecs returns number of registered WorkerProcess execs. -func (s *WorkerState) NumExecs() int64 { - return atomic.LoadInt64(&s.numExecs) +func (s *WorkerState) NumExecs() uint64 { + return atomic.LoadUint64(&s.numExecs) } // Value WorkerState returns WorkerState value @@ -109,7 +109,7 @@ func (s *WorkerState) Set(value int64) { // register new execution atomically func (s *WorkerState) RegisterExec() { - atomic.AddInt64(&s.numExecs, 1) + atomic.AddUint64(&s.numExecs, 1) } // Update last used time diff --git a/pkg/pool/config.go b/pkg/pool/config.go index cf4aaaee..782f7ce9 100644 --- a/pkg/pool/config.go +++ b/pkg/pool/config.go @@ -12,12 +12,12 @@ type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. - NumWorkers int64 `mapstructure:"num_workers"` + NumWorkers uint64 `mapstructure:"num_workers"` // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. - MaxJobs int64 `mapstructure:"max_jobs"` + MaxJobs uint64 `mapstructure:"max_jobs"` // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. @@ -34,7 +34,7 @@ type Config struct { // InitDefaults enables default config values. func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { - cfg.NumWorkers = int64(runtime.NumCPU()) + cfg.NumWorkers = uint64(runtime.NumCPU()) } if cfg.AllocateTimeout == 0 { @@ -52,24 +52,24 @@ func (cfg *Config) InitDefaults() { type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. - WatchTick uint64 + WatchTick time.Duration `mapstructure:"watch_tick"` // TTL defines maximum time worker is allowed to live. - TTL uint64 + TTL time.Duration `mapstructure:"ttl"` // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL uint64 + IdleTTL time.Duration `mapstructure:"idle_ttl"` // ExecTTL defines maximum lifetime per job. - ExecTTL uint64 + ExecTTL time.Duration `mapstructure:"exec_ttl"` // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"` } // InitDefaults enables default config values. func (cfg *SupervisorConfig) InitDefaults() { if cfg.WatchTick == 0 { - cfg.WatchTick = 1 + cfg.WatchTick = time.Second } } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 7f66eaac..44adf9c0 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -310,12 +310,12 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) { +func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) { const op = errors.Op("allocate workers") var workers []worker.SyncWorker // constant number of stack simplify logic - for i := int64(0); i < numWorkers; i++ { + for i := uint64(0); i < numWorkers; i++ { w, err := sp.allocator() if err != nil { return nil, errors.E(op, errors.WorkerAllocate, err) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index a877b28f..a32790e0 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -20,7 +20,7 @@ import ( ) var cfg = Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 5, DestroyTimeout: time.Second * 5, } @@ -596,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, }, diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index 583d05b4..2597b352 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -56,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { res, err := sp.pool.ExecWithContext(ctx, rqs) @@ -116,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) { func (sp *supervised) Start() { go func() { - watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second) + watchTout := time.NewTicker(sp.cfg.WatchTick) for { select { case <-sp.stopCh: @@ -154,7 +154,7 @@ func (sp *supervised) control() { continue } - if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) @@ -209,7 +209,7 @@ func (sp *supervised) control() { // IdleTTL is 1 second. // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. - if int64(sp.cfg.IdleTTL)-res <= 0 { + if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 73ab4927..c67d5d91 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -14,14 +14,14 @@ import ( ) var cfgSupervised = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -74,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) { func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 1, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, MaxWorkerMemory: 100, }, } @@ -115,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { func TestSupervisedPool_Idle(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 1, - ExecTTL: 100, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, MaxWorkerMemory: 100, }, } @@ -156,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) { func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 4, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, MaxWorkerMemory: 100, }, } @@ -198,14 +198,14 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { func TestSupervisedPool_MaxMemoryReached(t *testing.T) { var cfgExecTTL = Config{ - NumWorkers: int64(1), + NumWorkers: uint64(1), AllocateTimeout: time.Second, DestroyTimeout: time.Second, Supervisor: &SupervisorConfig{ - WatchTick: 1, - TTL: 100, - IdleTTL: 100, - ExecTTL: 4, + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, MaxWorkerMemory: 1, }, } diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index 2e5bbcd5..d4949c82 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -440,17 +440,17 @@ func Test_NumExecs2(t *testing.T) { if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index fa37ac0f..38166b85 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -462,17 +462,17 @@ func Test_NumExecs(t *testing.T) { if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(1), w.State().NumExecs()) + assert.Equal(t, uint64(1), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(2), w.State().NumExecs()) + assert.Equal(t, uint64(2), w.State().NumExecs()) _, err = sw.Exec(payload.Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } - assert.Equal(t, int64(3), w.State().NumExecs()) + assert.Equal(t, uint64(3), w.State().NumExecs()) } diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go index 2c3d512d..753b61ee 100755 --- a/pkg/worker_watcher/worker_watcher.go +++ b/pkg/worker_watcher/worker_watcher.go @@ -11,9 +11,9 @@ import ( ) // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher { +func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher { ww := &workerWatcher{ - stack: NewWorkersStack(uint64(numWorkers)), + stack: NewWorkersStack(numWorkers), allocator: allocator, events: events, } diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go index a8b1e8e3..bfbc1af6 100644 --- a/plugins/http/config/http.go +++ b/plugins/http/config/http.go @@ -72,7 +72,7 @@ func (c *HTTP) InitDefaults() error { // default pool c.Pool = &poolImpl.Config{ Debug: false, - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), MaxJobs: 1000, AllocateTimeout: time.Second * 60, DestroyTimeout: time.Second * 60, diff --git a/plugins/http/config/ssl.go b/plugins/http/config/ssl.go index c33dbce4..eb2b72b5 100644 --- a/plugins/http/config/ssl.go +++ b/plugins/http/config/ssl.go @@ -23,7 +23,7 @@ type SSL struct { Cert string // Root CA file - RootCA string + RootCA string `mapstructure:"root_ca"` // internal host string diff --git a/plugins/server/config.go b/plugins/server/config.go index 92e6780a..a4b0d91c 100644 --- a/plugins/server/config.go +++ b/plugins/server/config.go @@ -26,112 +26,25 @@ type Config struct { RelayTimeout time.Duration `mapstructure:"relay_timeout"` } `mapstructure:"server"` + // we just need to know if the section exist, we don't need to read config from it RPC *struct { Listen string `mapstructure:"listen"` } `mapstructure:"rpc"` Logs *struct { - Mode string `mapstructure:"mode"` - Level string `mapstructure:"level"` } `mapstructure:"logs"` HTTP *struct { - Address string `mapstructure:"address"` - MaxRequestSize int `mapstructure:"max_request_size"` - Middleware []string `mapstructure:"middleware"` - Uploads struct { - Forbid []string `mapstructure:"forbid"` - } `mapstructure:"uploads"` - TrustedSubnets []string `mapstructure:"trusted_subnets"` - Pool struct { - NumWorkers int `mapstructure:"num_workers"` - MaxJobs int `mapstructure:"max_jobs"` - AllocateTimeout string `mapstructure:"allocate_timeout"` - DestroyTimeout string `mapstructure:"destroy_timeout"` - Supervisor struct { - WatchTick int `mapstructure:"watch_tick"` - TTL int `mapstructure:"ttl"` - IdleTTL int `mapstructure:"idle_ttl"` - ExecTTL int `mapstructure:"exec_ttl"` - MaxWorkerMemory int `mapstructure:"max_worker_memory"` - } `mapstructure:"supervisor"` - } `mapstructure:"pool"` - Ssl struct { - Port int `mapstructure:"port"` - Redirect bool `mapstructure:"redirect"` - Cert string `mapstructure:"cert"` - Key string `mapstructure:"key"` - } `mapstructure:"ssl"` - Fcgi struct { - Address string `mapstructure:"address"` - } `mapstructure:"fcgi"` - HTTP2 struct { - Enabled bool `mapstructure:"enabled"` - H2C bool `mapstructure:"h2c"` - MaxConcurrentStreams int `mapstructure:"max_concurrent_streams"` - } `mapstructure:"http2"` } `mapstructure:"http"` Redis *struct { - Addrs []string `mapstructure:"addrs"` - MasterName string `mapstructure:"master_name"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - DB int `mapstructure:"db"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - DialTimeout int `mapstructure:"dial_timeout"` - MaxRetries int `mapstructure:"max_retries"` - MinRetryBackoff int `mapstructure:"min_retry_backoff"` - MaxRetryBackoff int `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge int `mapstructure:"max_conn_age"` - ReadTimeout int `mapstructure:"read_timeout"` - WriteTimeout int `mapstructure:"write_timeout"` - PoolTimeout int `mapstructure:"pool_timeout"` - IdleTimeout int `mapstructure:"idle_timeout"` - IdleCheckFreq int `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` } `mapstructure:"redis"` Boltdb *struct { - Dir string `mapstructure:"dir"` - File string `mapstructure:"file"` - Bucket string `mapstructure:"bucket"` - Permissions int `mapstructure:"permissions"` - TTL int `mapstructure:"TTL"` } `mapstructure:"boltdb"` Memcached *struct { - Addr []string `mapstructure:"addr"` } `mapstructure:"memcached"` Memory *struct { - Enabled bool `mapstructure:"enabled"` - Interval int `mapstructure:"interval"` } `mapstructure:"memory"` Metrics *struct { - Address string `mapstructure:"address"` - Collect struct { - AppMetric struct { - Type string `mapstructure:"type"` - Help string `mapstructure:"help"` - Labels []string `mapstructure:"labels"` - Buckets []float64 `mapstructure:"buckets"` - Objectives []struct { - Num2 float64 `mapstructure:"2,omitempty"` - One4 float64 `mapstructure:"1.4,omitempty"` - } `mapstructure:"objectives"` - } `mapstructure:"app_metric"` - } `mapstructure:"collect"` } `mapstructure:"metrics"` Reload *struct { - Interval string `mapstructure:"interval"` - Patterns []string `mapstructure:"patterns"` - Services struct { - HTTP struct { - Recursive bool `mapstructure:"recursive"` - Ignore []string `mapstructure:"ignore"` - Patterns []string `mapstructure:"patterns"` - Dirs []string `mapstructure:"dirs"` - } `mapstructure:"http"` - } `mapstructure:"services"` } `mapstructure:"reload"` } diff --git a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml index 55b2857d..ab42f4fc 100644 --- a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml +++ b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml @@ -26,7 +26,7 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://127.0.0.1:6921 http2: diff --git a/tests/plugins/http/configs/.rr-fcgi.yaml b/tests/plugins/http/configs/.rr-fcgi.yaml index 483da057..bd5d01bd 100644 --- a/tests/plugins/http/configs/.rr-fcgi.yaml +++ b/tests/plugins/http/configs/.rr-fcgi.yaml @@ -26,7 +26,7 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://0.0.0.0:6920 http2: diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml new file mode 100644 index 00000000..3e392577 --- /dev/null +++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml @@ -0,0 +1,33 @@ +rpc: + listen: tcp://127.0.0.1:15432 +server: + command: "php ../../http/client.php echo pipes" + user: "" + group: "" + env: + "RR_HTTP": "true" + relay: "pipes" + relay_timeout: "20s" + +http: + debug: true + address: 127.0.0.1:18888 + max_request_size: 1024 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + supervisor: + watch_tick: 1s + ttl: 0 + idle_ttl: 5s + exec_ttl: 10s + max_worker_memory: 100 +logs: + mode: development + level: error
\ No newline at end of file diff --git a/tests/plugins/http/configs/.rr-init.yaml b/tests/plugins/http/configs/.rr-init.yaml index 9b7d65c3..77132b43 100644 --- a/tests/plugins/http/configs/.rr-init.yaml +++ b/tests/plugins/http/configs/.rr-init.yaml @@ -29,7 +29,7 @@ http: redirect: false cert: fixtures/server.crt key: fixtures/server.key - # rootCa: root.crt + # root_ca: root.crt fcgi: address: tcp://0.0.0.0:7921 http2: diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go index 45931a49..e47dbd44 100644 --- a/tests/plugins/http/handler_test.go +++ b/tests/plugins/http/handler_test.go @@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) { func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") }, pipe.NewPipeFactory(), pool.Config{ - NumWorkers: int64(runtime.NumCPU()), + NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 1000, DestroyTimeout: time.Second * 1000, }) diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go index 26b28165..4f99dbbb 100644 --- a/tests/plugins/http/http_plugin_test.go +++ b/tests/plugins/http/http_plugin_test.go @@ -1221,6 +1221,125 @@ func TestHttpBrokenPipes(t *testing.T) { assert.Error(t, err) } +func TestHTTPSupervisedPool(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-http-supervised-pool.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + &informer.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + t.Run("HTTPEchoTest", echoHTTP2) + // worker should be destructed (idle_ttl) + t.Run("HTTPInformerCompareWorkersTest", informerTest2) + + stopCh <- struct{}{} + wg.Wait() +} + +func echoHTTP2(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost:18888?hello=world", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +// get worker +// sleep +// supervisor destroy worker +// compare pid's +func informerTest2(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:15432") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + pid := 0 + // WorkerList contains list of workers. + list := struct { + // Workers is list of workers. + Workers []tools.ProcessState `json:"workers"` + }{} + + err = client.Call("informer.Workers", "http", &list) + assert.NoError(t, err) + assert.Len(t, list.Workers, 1) + // save the pid + pid = list.Workers[0].Pid + time.Sleep(time.Second * 10) + + list = struct { + // Workers is list of workers. + Workers []tools.ProcessState `json:"workers"` + }{} + + err = client.Call("informer.Workers", "http", &list) + assert.NoError(t, err) + assert.Len(t, list.Workers, 1) + assert.NotEqual(t, list.Workers[0].Pid, pid) +} + func get(url string) (string, *http.Response, error) { r, err := http.Get(url) //nolint:gosec if err != nil { diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go index 7436a7fb..2300de89 100644 --- a/tests/plugins/informer/test_plugin.go +++ b/tests/plugins/informer/test_plugin.go @@ -16,10 +16,10 @@ var testPoolConfig = pool.Config{ AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, Supervisor: &pool.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go index 7d53bca0..61942516 100644 --- a/tests/plugins/resetter/test_plugin.go +++ b/tests/plugins/resetter/test_plugin.go @@ -15,10 +15,10 @@ var testPoolConfig = poolImpl.Config{ AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, Supervisor: &poolImpl.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go index 5b2cdd96..af34b4d3 100644 --- a/tests/plugins/server/plugin_pipes.go +++ b/tests/plugins/server/plugin_pipes.go @@ -21,10 +21,10 @@ var testPoolConfig = pool.Config{ AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, Supervisor: &pool.SupervisorConfig{ - WatchTick: 60, - TTL: 1000, - IdleTTL: 10, - ExecTTL: 10, + WatchTick: 60 * time.Second, + TTL: 1000 * time.Second, + IdleTTL: 10 * time.Second, + ExecTTL: 10 * time.Second, MaxWorkerMemory: 1000, }, } diff --git a/tools/process.go b/tools/process.go index a01f2b24..a6eb1139 100644 --- a/tools/process.go +++ b/tools/process.go @@ -15,7 +15,7 @@ type ProcessState struct { Status string `json:"status"` // Number of worker executions. - NumJobs int64 `json:"numExecs"` + NumJobs uint64 `json:"numExecs"` // Created is unix nano timestamp of worker creation time. Created int64 `json:"created"` diff --git a/tools/worker_table.go b/tools/worker_table.go index 4aeb6ae7..20b8084f 100644 --- a/tools/worker_table.go +++ b/tools/worker_table.go @@ -52,8 +52,9 @@ func renderStatus(status string) string { return status } -func renderJobs(number int64) string { - return humanize.Comma(number) +func renderJobs(number uint64) string { + // TODO overflow + return humanize.Comma(int64(number)) } func renderAlive(t time.Time) string { |