summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.rr.yaml21
-rwxr-xr-xinternal/state.go10
-rw-r--r--pkg/pool/config.go18
-rwxr-xr-xpkg/pool/static_pool.go4
-rwxr-xr-xpkg/pool/static_pool_test.go4
-rwxr-xr-xpkg/pool/supervisor_pool.go8
-rw-r--r--pkg/pool/supervisor_test.go50
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go6
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go6
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go4
-rw-r--r--plugins/http/config/http.go2
-rw-r--r--plugins/http/config/ssl.go2
-rw-r--r--plugins/server/config.go89
-rw-r--r--tests/plugins/http/configs/.rr-fcgi-reqUri.yaml2
-rw-r--r--tests/plugins/http/configs/.rr-fcgi.yaml2
-rw-r--r--tests/plugins/http/configs/.rr-http-supervised-pool.yaml33
-rw-r--r--tests/plugins/http/configs/.rr-init.yaml2
-rw-r--r--tests/plugins/http/handler_test.go2
-rw-r--r--tests/plugins/http/http_plugin_test.go119
-rw-r--r--tests/plugins/informer/test_plugin.go8
-rw-r--r--tests/plugins/resetter/test_plugin.go8
-rw-r--r--tests/plugins/server/plugin_pipes.go8
-rw-r--r--tools/process.go2
-rw-r--r--tools/worker_table.go5
24 files changed, 240 insertions, 175 deletions
diff --git a/.rr.yaml b/.rr.yaml
index 600f0255..06e452ce 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -9,7 +9,7 @@ server:
"RR_HTTP": "true"
"RR_RPC": "tcp://127.0.0.1:6001"
relay: "pipes"
- relay_timeout: "20s"
+ relay_timeout: 20s
logs:
mode: development
@@ -44,7 +44,6 @@ http:
input: "custom-header"
response:
output: "output-header"
-
static:
dir: "tests"
forbid: [ "" ]
@@ -59,15 +58,15 @@ http:
allocate_timeout: 60s
destroy_timeout: 60s
supervisor:
- # WatchTick defines how often to check the state of worker (seconds)
- watch_tick: 1
- # TTL defines maximum time worker is allowed to live (seconds)
+ # watch_tick defines how often to check the state of the workers (seconds)
+ watch_tick: 1s
+ # ttl defines maximum time worker is allowed to live (seconds)
ttl: 0
- # IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0 (seconds)
- idle_ttl: 10
- # ExecTTL defines maximum lifetime per job (seconds)
- exec_ttl: 10
- # MaxWorkerMemory limits memory per worker (MB)
+ # idle_ttl defines maximum duration worker can spend in idle mode after first use. Disabled when 0 (seconds)
+ idle_ttl: 10s
+ # exec_ttl defines maximum lifetime per job (seconds)
+ exec_ttl: 10s
+ # max_worker_memory limits memory usage per worker (MB)
max_worker_memory: 100
ssl:
@@ -76,7 +75,7 @@ http:
redirect: false
cert: fixtures/server.crt
key: fixtures/server.key
- rootCa: root.crt
+ root_ca: root.crt
fcgi:
address: tcp://0.0.0.0:7921
http2:
diff --git a/internal/state.go b/internal/state.go
index 8f7d939b..a14a6937 100755
--- a/internal/state.go
+++ b/internal/state.go
@@ -13,7 +13,7 @@ type State interface {
// Set sets the WorkerState
Set(value int64)
// NumJobs shows how many times WorkerProcess was invoked
- NumExecs() int64
+ NumExecs() uint64
// IsActive returns true if WorkerProcess not Inactive or Stopped
IsActive() bool
// RegisterExec using to registering php executions
@@ -56,7 +56,7 @@ const (
type WorkerState struct {
value int64
- numExecs int64
+ numExecs uint64
// to be lightweight, use UnixNano
lastUsed uint64
}
@@ -87,8 +87,8 @@ func (s *WorkerState) String() string {
}
// NumExecs returns number of registered WorkerProcess execs.
-func (s *WorkerState) NumExecs() int64 {
- return atomic.LoadInt64(&s.numExecs)
+func (s *WorkerState) NumExecs() uint64 {
+ return atomic.LoadUint64(&s.numExecs)
}
// Value WorkerState returns WorkerState value
@@ -109,7 +109,7 @@ func (s *WorkerState) Set(value int64) {
// register new execution atomically
func (s *WorkerState) RegisterExec() {
- atomic.AddInt64(&s.numExecs, 1)
+ atomic.AddUint64(&s.numExecs, 1)
}
// Update last used time
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index cf4aaaee..782f7ce9 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -12,12 +12,12 @@ type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64 `mapstructure:"num_workers"`
+ NumWorkers uint64 `mapstructure:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs int64 `mapstructure:"max_jobs"`
+ MaxJobs uint64 `mapstructure:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
@@ -34,7 +34,7 @@ type Config struct {
// InitDefaults enables default config values.
func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
- cfg.NumWorkers = int64(runtime.NumCPU())
+ cfg.NumWorkers = uint64(runtime.NumCPU())
}
if cfg.AllocateTimeout == 0 {
@@ -52,24 +52,24 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64
+ WatchTick time.Duration `mapstructure:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
- TTL uint64
+ TTL time.Duration `mapstructure:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64
+ IdleTTL time.Duration `mapstructure:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64
+ ExecTTL time.Duration `mapstructure:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}
// InitDefaults enables default config values.
func (cfg *SupervisorConfig) InitDefaults() {
if cfg.WatchTick == 0 {
- cfg.WatchTick = 1
+ cfg.WatchTick = time.Second
}
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 7f66eaac..44adf9c0 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -310,12 +310,12 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.SyncWorker, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.SyncWorker, error) {
const op = errors.Op("allocate workers")
var workers []worker.SyncWorker
// constant number of stack simplify logic
- for i := int64(0); i < numWorkers; i++ {
+ for i := uint64(0); i < numWorkers; i++ {
w, err := sp.allocator()
if err != nil {
return nil, errors.E(op, errors.WorkerAllocate, err)
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index a877b28f..a32790e0 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -20,7 +20,7 @@ import (
)
var cfg = Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
}
@@ -596,7 +596,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 583d05b4..2597b352 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -56,7 +56,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second)
+ ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
defer cancel()
go func() {
res, err := sp.pool.ExecWithContext(ctx, rqs)
@@ -116,7 +116,7 @@ func (sp *supervised) Destroy(ctx context.Context) {
func (sp *supervised) Start() {
go func() {
- watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second)
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
for {
select {
case <-sp.stopCh:
@@ -154,7 +154,7 @@ func (sp *supervised) control() {
continue
}
- if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
@@ -209,7 +209,7 @@ func (sp *supervised) control() {
// IdleTTL is 1 second.
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
- if int64(sp.cfg.IdleTTL)-res <= 0 {
+ if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index 73ab4927..c67d5d91 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -14,14 +14,14 @@ import (
)
var cfgSupervised = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -74,14 +74,14 @@ func TestSupervisedPool_Exec(t *testing.T) {
func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 1,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 1 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -115,14 +115,14 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
func TestSupervisedPool_Idle(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 1,
- ExecTTL: 100,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 1 * time.Second,
+ ExecTTL: 100 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -156,14 +156,14 @@ func TestSupervisedPool_Idle(t *testing.T) {
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 4,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
MaxWorkerMemory: 100,
},
}
@@ -198,14 +198,14 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
var cfgExecTTL = Config{
- NumWorkers: int64(1),
+ NumWorkers: uint64(1),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
Supervisor: &SupervisorConfig{
- WatchTick: 1,
- TTL: 100,
- IdleTTL: 100,
- ExecTTL: 4,
+ WatchTick: 1 * time.Second,
+ TTL: 100 * time.Second,
+ IdleTTL: 100 * time.Second,
+ ExecTTL: 4 * time.Second,
MaxWorkerMemory: 1,
},
}
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index 2e5bbcd5..d4949c82 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -440,17 +440,17 @@ func Test_NumExecs2(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(1), w.State().NumExecs())
+ assert.Equal(t, uint64(1), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(2), w.State().NumExecs())
+ assert.Equal(t, uint64(2), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(3), w.State().NumExecs())
+ assert.Equal(t, uint64(3), w.State().NumExecs())
}
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index fa37ac0f..38166b85 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -462,17 +462,17 @@ func Test_NumExecs(t *testing.T) {
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(1), w.State().NumExecs())
+ assert.Equal(t, uint64(1), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(2), w.State().NumExecs())
+ assert.Equal(t, uint64(2), w.State().NumExecs())
_, err = sw.Exec(payload.Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
- assert.Equal(t, int64(3), w.State().NumExecs())
+ assert.Equal(t, uint64(3), w.State().NumExecs())
}
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 2c3d512d..753b61ee 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -11,9 +11,9 @@ import (
)
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers int64, events events.Handler) Watcher {
+func NewSyncWorkerWatcher(allocator worker.Allocator, numWorkers uint64, events events.Handler) Watcher {
ww := &workerWatcher{
- stack: NewWorkersStack(uint64(numWorkers)),
+ stack: NewWorkersStack(numWorkers),
allocator: allocator,
events: events,
}
diff --git a/plugins/http/config/http.go b/plugins/http/config/http.go
index a8b1e8e3..bfbc1af6 100644
--- a/plugins/http/config/http.go
+++ b/plugins/http/config/http.go
@@ -72,7 +72,7 @@ func (c *HTTP) InitDefaults() error {
// default pool
c.Pool = &poolImpl.Config{
Debug: false,
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
MaxJobs: 1000,
AllocateTimeout: time.Second * 60,
DestroyTimeout: time.Second * 60,
diff --git a/plugins/http/config/ssl.go b/plugins/http/config/ssl.go
index c33dbce4..eb2b72b5 100644
--- a/plugins/http/config/ssl.go
+++ b/plugins/http/config/ssl.go
@@ -23,7 +23,7 @@ type SSL struct {
Cert string
// Root CA file
- RootCA string
+ RootCA string `mapstructure:"root_ca"`
// internal
host string
diff --git a/plugins/server/config.go b/plugins/server/config.go
index 92e6780a..a4b0d91c 100644
--- a/plugins/server/config.go
+++ b/plugins/server/config.go
@@ -26,112 +26,25 @@ type Config struct {
RelayTimeout time.Duration `mapstructure:"relay_timeout"`
} `mapstructure:"server"`
+ // we just need to know if the section exist, we don't need to read config from it
RPC *struct {
Listen string `mapstructure:"listen"`
} `mapstructure:"rpc"`
Logs *struct {
- Mode string `mapstructure:"mode"`
- Level string `mapstructure:"level"`
} `mapstructure:"logs"`
HTTP *struct {
- Address string `mapstructure:"address"`
- MaxRequestSize int `mapstructure:"max_request_size"`
- Middleware []string `mapstructure:"middleware"`
- Uploads struct {
- Forbid []string `mapstructure:"forbid"`
- } `mapstructure:"uploads"`
- TrustedSubnets []string `mapstructure:"trusted_subnets"`
- Pool struct {
- NumWorkers int `mapstructure:"num_workers"`
- MaxJobs int `mapstructure:"max_jobs"`
- AllocateTimeout string `mapstructure:"allocate_timeout"`
- DestroyTimeout string `mapstructure:"destroy_timeout"`
- Supervisor struct {
- WatchTick int `mapstructure:"watch_tick"`
- TTL int `mapstructure:"ttl"`
- IdleTTL int `mapstructure:"idle_ttl"`
- ExecTTL int `mapstructure:"exec_ttl"`
- MaxWorkerMemory int `mapstructure:"max_worker_memory"`
- } `mapstructure:"supervisor"`
- } `mapstructure:"pool"`
- Ssl struct {
- Port int `mapstructure:"port"`
- Redirect bool `mapstructure:"redirect"`
- Cert string `mapstructure:"cert"`
- Key string `mapstructure:"key"`
- } `mapstructure:"ssl"`
- Fcgi struct {
- Address string `mapstructure:"address"`
- } `mapstructure:"fcgi"`
- HTTP2 struct {
- Enabled bool `mapstructure:"enabled"`
- H2C bool `mapstructure:"h2c"`
- MaxConcurrentStreams int `mapstructure:"max_concurrent_streams"`
- } `mapstructure:"http2"`
} `mapstructure:"http"`
Redis *struct {
- Addrs []string `mapstructure:"addrs"`
- MasterName string `mapstructure:"master_name"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- DB int `mapstructure:"db"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- DialTimeout int `mapstructure:"dial_timeout"`
- MaxRetries int `mapstructure:"max_retries"`
- MinRetryBackoff int `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff int `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge int `mapstructure:"max_conn_age"`
- ReadTimeout int `mapstructure:"read_timeout"`
- WriteTimeout int `mapstructure:"write_timeout"`
- PoolTimeout int `mapstructure:"pool_timeout"`
- IdleTimeout int `mapstructure:"idle_timeout"`
- IdleCheckFreq int `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
} `mapstructure:"redis"`
Boltdb *struct {
- Dir string `mapstructure:"dir"`
- File string `mapstructure:"file"`
- Bucket string `mapstructure:"bucket"`
- Permissions int `mapstructure:"permissions"`
- TTL int `mapstructure:"TTL"`
} `mapstructure:"boltdb"`
Memcached *struct {
- Addr []string `mapstructure:"addr"`
} `mapstructure:"memcached"`
Memory *struct {
- Enabled bool `mapstructure:"enabled"`
- Interval int `mapstructure:"interval"`
} `mapstructure:"memory"`
Metrics *struct {
- Address string `mapstructure:"address"`
- Collect struct {
- AppMetric struct {
- Type string `mapstructure:"type"`
- Help string `mapstructure:"help"`
- Labels []string `mapstructure:"labels"`
- Buckets []float64 `mapstructure:"buckets"`
- Objectives []struct {
- Num2 float64 `mapstructure:"2,omitempty"`
- One4 float64 `mapstructure:"1.4,omitempty"`
- } `mapstructure:"objectives"`
- } `mapstructure:"app_metric"`
- } `mapstructure:"collect"`
} `mapstructure:"metrics"`
Reload *struct {
- Interval string `mapstructure:"interval"`
- Patterns []string `mapstructure:"patterns"`
- Services struct {
- HTTP struct {
- Recursive bool `mapstructure:"recursive"`
- Ignore []string `mapstructure:"ignore"`
- Patterns []string `mapstructure:"patterns"`
- Dirs []string `mapstructure:"dirs"`
- } `mapstructure:"http"`
- } `mapstructure:"services"`
} `mapstructure:"reload"`
}
diff --git a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml
index 55b2857d..ab42f4fc 100644
--- a/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml
+++ b/tests/plugins/http/configs/.rr-fcgi-reqUri.yaml
@@ -26,7 +26,7 @@ http:
redirect: false
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
+ # root_ca: root.crt
fcgi:
address: tcp://127.0.0.1:6921
http2:
diff --git a/tests/plugins/http/configs/.rr-fcgi.yaml b/tests/plugins/http/configs/.rr-fcgi.yaml
index 483da057..bd5d01bd 100644
--- a/tests/plugins/http/configs/.rr-fcgi.yaml
+++ b/tests/plugins/http/configs/.rr-fcgi.yaml
@@ -26,7 +26,7 @@ http:
redirect: false
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
+ # root_ca: root.crt
fcgi:
address: tcp://0.0.0.0:6920
http2:
diff --git a/tests/plugins/http/configs/.rr-http-supervised-pool.yaml b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
new file mode 100644
index 00000000..3e392577
--- /dev/null
+++ b/tests/plugins/http/configs/.rr-http-supervised-pool.yaml
@@ -0,0 +1,33 @@
+rpc:
+ listen: tcp://127.0.0.1:15432
+server:
+ command: "php ../../http/client.php echo pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ debug: true
+ address: 127.0.0.1:18888
+ max_request_size: 1024
+ middleware: [ "" ]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 1
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+ supervisor:
+ watch_tick: 1s
+ ttl: 0
+ idle_ttl: 5s
+ exec_ttl: 10s
+ max_worker_memory: 100
+logs:
+ mode: development
+ level: error \ No newline at end of file
diff --git a/tests/plugins/http/configs/.rr-init.yaml b/tests/plugins/http/configs/.rr-init.yaml
index 9b7d65c3..77132b43 100644
--- a/tests/plugins/http/configs/.rr-init.yaml
+++ b/tests/plugins/http/configs/.rr-init.yaml
@@ -29,7 +29,7 @@ http:
redirect: false
cert: fixtures/server.crt
key: fixtures/server.key
- # rootCa: root.crt
+ # root_ca: root.crt
fcgi:
address: tcp://0.0.0.0:7921
http2:
diff --git a/tests/plugins/http/handler_test.go b/tests/plugins/http/handler_test.go
index 45931a49..e47dbd44 100644
--- a/tests/plugins/http/handler_test.go
+++ b/tests/plugins/http/handler_test.go
@@ -1800,7 +1800,7 @@ func BenchmarkHandler_Listen_Echo(b *testing.B) {
func() *exec.Cmd { return exec.Command("php", "../../http/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
pool.Config{
- NumWorkers: int64(runtime.NumCPU()),
+ NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 1000,
DestroyTimeout: time.Second * 1000,
})
diff --git a/tests/plugins/http/http_plugin_test.go b/tests/plugins/http/http_plugin_test.go
index 26b28165..4f99dbbb 100644
--- a/tests/plugins/http/http_plugin_test.go
+++ b/tests/plugins/http/http_plugin_test.go
@@ -1221,6 +1221,125 @@ func TestHttpBrokenPipes(t *testing.T) {
assert.Error(t, err)
}
+func TestHTTPSupervisedPool(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-http-supervised-pool.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ &informer.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("HTTPEchoTest", echoHTTP2)
+ // worker should be destructed (idle_ttl)
+ t.Run("HTTPInformerCompareWorkersTest", informerTest2)
+
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func echoHTTP2(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://localhost:18888?hello=world", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ b, err := ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, 201, r.StatusCode)
+ assert.Equal(t, "WORLD", string(b))
+
+ err = r.Body.Close()
+ assert.NoError(t, err)
+}
+
+// get worker
+// sleep
+// supervisor destroy worker
+// compare pid's
+func informerTest2(t *testing.T) {
+ conn, err := net.Dial("tcp", "127.0.0.1:15432")
+ assert.NoError(t, err)
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+ pid := 0
+ // WorkerList contains list of workers.
+ list := struct {
+ // Workers is list of workers.
+ Workers []tools.ProcessState `json:"workers"`
+ }{}
+
+ err = client.Call("informer.Workers", "http", &list)
+ assert.NoError(t, err)
+ assert.Len(t, list.Workers, 1)
+ // save the pid
+ pid = list.Workers[0].Pid
+ time.Sleep(time.Second * 10)
+
+ list = struct {
+ // Workers is list of workers.
+ Workers []tools.ProcessState `json:"workers"`
+ }{}
+
+ err = client.Call("informer.Workers", "http", &list)
+ assert.NoError(t, err)
+ assert.Len(t, list.Workers, 1)
+ assert.NotEqual(t, list.Workers[0].Pid, pid)
+}
+
func get(url string) (string, *http.Response, error) {
r, err := http.Get(url) //nolint:gosec
if err != nil {
diff --git a/tests/plugins/informer/test_plugin.go b/tests/plugins/informer/test_plugin.go
index 7436a7fb..2300de89 100644
--- a/tests/plugins/informer/test_plugin.go
+++ b/tests/plugins/informer/test_plugin.go
@@ -16,10 +16,10 @@ var testPoolConfig = pool.Config{
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
Supervisor: &pool.SupervisorConfig{
- WatchTick: 60,
- TTL: 1000,
- IdleTTL: 10,
- ExecTTL: 10,
+ WatchTick: 60 * time.Second,
+ TTL: 1000 * time.Second,
+ IdleTTL: 10 * time.Second,
+ ExecTTL: 10 * time.Second,
MaxWorkerMemory: 1000,
},
}
diff --git a/tests/plugins/resetter/test_plugin.go b/tests/plugins/resetter/test_plugin.go
index 7d53bca0..61942516 100644
--- a/tests/plugins/resetter/test_plugin.go
+++ b/tests/plugins/resetter/test_plugin.go
@@ -15,10 +15,10 @@ var testPoolConfig = poolImpl.Config{
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
Supervisor: &poolImpl.SupervisorConfig{
- WatchTick: 60,
- TTL: 1000,
- IdleTTL: 10,
- ExecTTL: 10,
+ WatchTick: 60 * time.Second,
+ TTL: 1000 * time.Second,
+ IdleTTL: 10 * time.Second,
+ ExecTTL: 10 * time.Second,
MaxWorkerMemory: 1000,
},
}
diff --git a/tests/plugins/server/plugin_pipes.go b/tests/plugins/server/plugin_pipes.go
index 5b2cdd96..af34b4d3 100644
--- a/tests/plugins/server/plugin_pipes.go
+++ b/tests/plugins/server/plugin_pipes.go
@@ -21,10 +21,10 @@ var testPoolConfig = pool.Config{
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
Supervisor: &pool.SupervisorConfig{
- WatchTick: 60,
- TTL: 1000,
- IdleTTL: 10,
- ExecTTL: 10,
+ WatchTick: 60 * time.Second,
+ TTL: 1000 * time.Second,
+ IdleTTL: 10 * time.Second,
+ ExecTTL: 10 * time.Second,
MaxWorkerMemory: 1000,
},
}
diff --git a/tools/process.go b/tools/process.go
index a01f2b24..a6eb1139 100644
--- a/tools/process.go
+++ b/tools/process.go
@@ -15,7 +15,7 @@ type ProcessState struct {
Status string `json:"status"`
// Number of worker executions.
- NumJobs int64 `json:"numExecs"`
+ NumJobs uint64 `json:"numExecs"`
// Created is unix nano timestamp of worker creation time.
Created int64 `json:"created"`
diff --git a/tools/worker_table.go b/tools/worker_table.go
index 4aeb6ae7..20b8084f 100644
--- a/tools/worker_table.go
+++ b/tools/worker_table.go
@@ -52,8 +52,9 @@ func renderStatus(status string) string {
return status
}
-func renderJobs(number int64) string {
- return humanize.Comma(number)
+func renderJobs(number uint64) string {
+ // TODO overflow
+ return humanize.Comma(int64(number))
}
func renderAlive(t time.Time) string {