summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.rr.yaml49
-rw-r--r--codecov.yml9
-rw-r--r--pkg/pool/config.go20
-rwxr-xr-xpkg/pool/supervisor_pool.go27
-rw-r--r--pkg/pool/supervisor_test.go41
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go3
-rw-r--r--plugins/config/interface.go6
-rwxr-xr-xplugins/config/plugin.go9
-rw-r--r--plugins/http/config.go6
-rw-r--r--plugins/kv/memcached/plugin_unit_test.go (renamed from plugins/kv/memcached/storage_test.go)0
-rw-r--r--plugins/kv/memory/plugin_unit_test.go (renamed from plugins/kv/memory/storage_test.go)0
-rw-r--r--plugins/rpc/config.go3
-rw-r--r--plugins/rpc/plugin.go27
-rw-r--r--plugins/server/config.go155
-rw-r--r--plugins/server/plugin.go48
-rwxr-xr-xtests/plugins/config/.rr.yaml3
-rwxr-xr-xtests/plugins/config/config_test.go6
-rwxr-xr-xtests/plugins/config/plugin1.go47
-rw-r--r--tests/plugins/kv/boltdb/plugin_test.go2
-rw-r--r--tests/plugins/kv/memcached/plugin_test.go2
-rw-r--r--tests/plugins/kv/memory/plugin_test.go2
-rw-r--r--tests/plugins/rpc/configs/.rr-rpc-disabled.yaml3
-rw-r--r--tests/plugins/rpc/configs/.rr.yaml1
23 files changed, 360 insertions, 109 deletions
diff --git a/.rr.yaml b/.rr.yaml
index 17132c8a..18087c2d 100755
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -1,6 +1,5 @@
rpc:
listen: tcp://127.0.0.1:6001
- disabled: false
server:
command: "php tests/psr-worker-bench.php"
@@ -8,6 +7,7 @@ server:
group: ""
env:
"RR_HTTP": "true"
+ "RR_RPC": "tcp://127.0.0.1:6001"
relay: "pipes"
relayTimeout: "20s"
@@ -17,11 +17,11 @@ logs:
http:
address: 127.0.0.1:44933
- maxRequestSize: 1024
+ max_request_size: 1024
middleware: ["gzip", "headers"]
uploads:
forbid: [".php", ".exe", ".bat"]
- trustedSubnets:
+ trusted_subnets:
[
"10.0.0.0/8",
"127.0.0.0/8",
@@ -32,23 +32,34 @@ http:
"fe80::/10",
]
pool:
- numWorkers: 6
- maxJobs: 0
- allocateTimeout: 60s
- destroyTimeout: 60s
+ num_workers: 6
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+ supervisor:
+ # WatchTick defines how often to check the state of worker (seconds)
+ watch_tick: 10
+ # TTL defines maximum time worker is allowed to live (seconds)
+ ttl: 10
+ # IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0 (seconds)
+ idle_ttl: 10
+ # ExecTTL defines maximum lifetime per job (seconds)
+ exec_ttl: 10
+ # MaxWorkerMemory limits memory per worker (MB)
+ max_worker_memory: 100
-# ssl:
-# port: 8892
-# redirect: false
-# cert: fixtures/server.crt
-# key: fixtures/server.key
-# # rootCa: root.crt
-# fcgi:
-# address: tcp://0.0.0.0:7921
-# http2:
-# enabled: false
-# h2c: false
-# maxConcurrentStreams: 128
+ ssl:
+ port: 8892
+ redirect: false
+ cert: fixtures/server.crt
+ key: fixtures/server.key
+ # rootCa: root.crt
+ fcgi:
+ address: tcp://0.0.0.0:7921
+ http2:
+ enabled: false
+ h2c: false
+ max_concurrent_streams: 128
redis:
# UniversalClient is an abstract client which - based on the provided options -
diff --git a/codecov.yml b/codecov.yml
index 75e92a5f..eb499c3a 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -9,4 +9,11 @@ coverage:
default:
target: auto
threshold: 0%
- informational: true \ No newline at end of file
+ informational: true
+
+# do not include tests folders
+ignore:
+ - "tests"
+ - "plugins/kv/boltdb/plugin_unit_test.go"
+ - "plugins/kv/memcached/plugin_unit_test.go"
+ - "plugins/kv/memory/plugin_unit_test.go" \ No newline at end of file
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 3dcc3584..acdd3d6f 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -12,23 +12,23 @@ type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
- NumWorkers int64
+ NumWorkers int64 `yaml:"num_workers"`
// MaxJobs defines how many executions is allowed for the worker until
// it's destruction. set 1 to create new process for each new task, 0 to let
// worker handle as many tasks as it can.
- MaxJobs int64
+ MaxJobs int64 `yaml:"max_jobs"`
// AllocateTimeout defines for how long pool will be waiting for a worker to
// be freed to handle the task. Defaults to 60s.
- AllocateTimeout time.Duration
+ AllocateTimeout time.Duration `yaml:"allocate_timeout"`
// DestroyTimeout defines for how long pool should be waiting for worker to
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
- DestroyTimeout time.Duration
+ DestroyTimeout time.Duration `yaml:"destroy_timeout"`
// Supervision config to limit worker and pool memory usage.
- Supervisor *SupervisorConfig
+ Supervisor *SupervisorConfig `yaml:"supervisor"`
}
// InitDefaults enables default config values.
@@ -52,19 +52,19 @@ func (cfg *Config) InitDefaults() {
type SupervisorConfig struct {
// WatchTick defines how often to check the state of worker.
- WatchTick uint64
+ WatchTick uint64 `yaml:"watch_tick"`
// TTL defines maximum time worker is allowed to live.
- TTL uint64
+ TTL uint64 `yaml:"ttl"`
// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL uint64
+ IdleTTL uint64 `yaml:"idle_ttl"`
// ExecTTL defines maximum lifetime per job.
- ExecTTL uint64
+ ExecTTL uint64 `yaml:"exec_ttl"`
// MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ MaxWorkerMemory uint64 `yaml:"max_worker_memory"`
}
// InitDefaults enables default config values.
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 378be7dd..07fa7019 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -16,6 +16,9 @@ import (
const MB = 1024 * 1024
+// NSEC_IN_SEC nanoseconds in second
+const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck
+
type Supervised interface {
pool.Pool
// Start used to start watching process for all pool workers
@@ -54,7 +57,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(sp.cfg.ExecTTL))
+ ctx, cancel := context.WithTimeout(ctx, time.Duration(sp.cfg.ExecTTL)*time.Second)
defer cancel()
go func() {
res, err := sp.pool.ExecWithContext(ctx, rqs)
@@ -114,7 +117,7 @@ func (sp *supervised) Destroy(ctx context.Context) {
func (sp *supervised) Start() {
go func() {
- watchTout := time.NewTicker(time.Second * time.Duration(sp.cfg.WatchTick))
+ watchTout := time.NewTicker(time.Duration(sp.cfg.WatchTick) * time.Second)
for {
select {
case <-sp.stopCh:
@@ -186,14 +189,28 @@ func (sp *supervised) control() {
we are guessing that worker overlap idle time and has to be killed
*/
+ // 1610530005534416045 lu
+ // lu - now = -7811150814 - nanoseconds
+ // 7.8 seconds
// get last used unix nano
lu := workers[i].State().LastUsed()
+ // worker not used, skip
+ if lu == 0 {
+ continue
+ }
- // convert last used to unixNano and sub time.now
- res := int64(lu) - now.UnixNano()
+ // convert last used to unixNano and sub time.now to seconds
+ // negative number, because lu always in the past, except for the `back to the future` :)
+ res := ((int64(lu) - now.UnixNano()) / NSEC_IN_SEC) * -1
// maxWorkerIdle more than diff between now and last used
- if sp.cfg.IdleTTL-uint64(res) <= 0 {
+ // for example:
+ // After exec worker goes to the rest
+ // And resting for the 5 seconds
+ // IdleTTL is 1 second.
+ // After the control check, res will be 5, idle is 1
+ // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
+ if int64(sp.cfg.IdleTTL)-res <= 0 {
err = sp.pool.RemoveWorker(workers[i])
if err != nil {
sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index cb67ebe1..72226bee 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -112,6 +112,47 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
assert.NotEqual(t, pid, p.Workers()[0].Pid())
}
+func TestSupervisedPool_Idle(t *testing.T) {
+ var cfgExecTTL = Config{
+ NumWorkers: int64(1),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ Supervisor: &SupervisorConfig{
+ WatchTick: 1,
+ TTL: 100,
+ IdleTTL: 1,
+ ExecTTL: 100,
+ MaxWorkerMemory: 100,
+ },
+ }
+ ctx := context.Background()
+ p, err := Initialize(
+ ctx,
+ func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
+ pipe.NewPipeFactory(),
+ cfgExecTTL,
+ )
+
+ assert.NoError(t, err)
+ assert.NotNil(t, p)
+ defer p.Destroy(context.Background())
+
+ pid := p.Workers()[0].Pid()
+
+ resp, err := p.ExecWithContext(context.Background(), payload.Payload{
+ Context: []byte(""),
+ Body: []byte("foo"),
+ })
+
+ assert.Nil(t, err)
+ assert.Empty(t, resp.Body)
+ assert.Empty(t, resp.Context)
+
+ time.Sleep(time.Second * 5)
+ // should be new worker with new pid
+ assert.NotEqual(t, pid, p.Workers()[0].Pid())
+}
+
func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
var cfgExecTTL = Config{
NumWorkers: int64(1),
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 348f0459..127dc801 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -236,7 +236,7 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
pid := wb.Pid()
if ww.stack.FindAndRemoveByPid(pid) {
- wb.State().Set(internal.StateInvalid)
+ wb.State().Set(internal.StateRemove)
err := wb.Kill()
if err != nil {
return errors.E(op, err)
@@ -244,7 +244,6 @@ func (ww *workerWatcher) RemoveWorker(wb worker.BaseProcess) error {
return nil
}
- wb.State().Set(internal.StateRemove)
return nil
}
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 2a7c67ce..23279f53 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -1,7 +1,7 @@
package config
type Configurer interface {
- // UnmarshalKey reads configuration section into configuration object.
+ // // UnmarshalKey takes a single key and unmarshals it into a Struct.
//
// func (h *HttpService) Init(cp config.Configurer) error {
// h.config := &HttpConfig{}
@@ -11,6 +11,10 @@ type Configurer interface {
// }
UnmarshalKey(name string, out interface{}) error
+ // Unmarshal unmarshals the config into a Struct. Make sure that the tags
+ // on the fields of the structure are properly set.
+ Unmarshal(out interface{}) error
+
// Get used to get config section
Get(name string) interface{}
diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go
index 1a170448..9cecf9f9 100755
--- a/plugins/config/plugin.go
+++ b/plugins/config/plugin.go
@@ -64,6 +64,15 @@ func (v *Viper) UnmarshalKey(name string, out interface{}) error {
return nil
}
+func (v *Viper) Unmarshal(out interface{}) error {
+ const op = errors.Op("config unmarshal")
+ err := v.viper.Unmarshal(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
// Get raw config in a form of config section.
func (v *Viper) Get(name string) interface{} {
return v.viper.Get(name)
diff --git a/plugins/http/config.go b/plugins/http/config.go
index 3b670c86..abde8917 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -49,10 +49,10 @@ type Config struct {
HTTP2 *HTTP2Config
// MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
- MaxRequestSize uint64
+ MaxRequestSize uint64 `yaml:"max_request_size"`
// TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For
- TrustedSubnets []string
+ TrustedSubnets []string `yaml:"trusted_subnets"`
// Uploads configures uploads configuration.
Uploads *UploadsConfig
@@ -85,7 +85,7 @@ type HTTP2Config struct {
H2C bool
// MaxConcurrentStreams defaults to 128.
- MaxConcurrentStreams uint32
+ MaxConcurrentStreams uint32 `yaml:"max_concurrent_streams"`
}
// InitDefaults sets default values for HTTP/2 configuration.
diff --git a/plugins/kv/memcached/storage_test.go b/plugins/kv/memcached/plugin_unit_test.go
index 3d37748b..3d37748b 100644
--- a/plugins/kv/memcached/storage_test.go
+++ b/plugins/kv/memcached/plugin_unit_test.go
diff --git a/plugins/kv/memory/storage_test.go b/plugins/kv/memory/plugin_unit_test.go
index d3b24860..d3b24860 100644
--- a/plugins/kv/memory/storage_test.go
+++ b/plugins/kv/memory/plugin_unit_test.go
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
index d7531435..88ad7f0e 100644
--- a/plugins/rpc/config.go
+++ b/plugins/rpc/config.go
@@ -12,9 +12,6 @@ import (
type Config struct {
// Listen string
Listen string
-
- // Disabled disables RPC service.
- Disabled bool
}
// InitDefaults allows to init blank config with pre-defined set of default values.
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 6a83326b..c5813e7b 100644
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -22,17 +22,18 @@ type pluggable struct {
// Plugin is RPC service.
type Plugin struct {
- cfg Config
- log logger.Logger
- rpc *rpc.Server
- services []pluggable
+ cfg Config
+ log logger.Logger
+ rpc *rpc.Server
+ // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
+ plugins []pluggable
listener net.Listener
closed *uint32
}
// Init rpc service. Must return true if service is enabled.
func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("RPC plugin init")
+ const op = errors.Op("rpc plugin init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
@@ -43,10 +44,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
s.cfg.InitDefaults()
- if s.cfg.Disabled {
- return errors.E(op, errors.Disabled)
- }
-
s.log = log
state := uint32(0)
s.closed = &state
@@ -57,22 +54,22 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
// Serve serves the service.
func (s *Plugin) Serve() chan error {
- const op = errors.Op("register service")
+ const op = errors.Op("serve rpc plugin")
errCh := make(chan error, 1)
s.rpc = rpc.NewServer()
- services := make([]string, 0, len(s.services))
+ services := make([]string, 0, len(s.plugins))
// Attach all services
- for i := 0; i < len(s.services); i++ {
- err := s.Register(s.services[i].name, s.services[i].service.RPC())
+ for i := 0; i < len(s.plugins); i++ {
+ err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
}
- services = append(services, s.services[i].name)
+ services = append(services, s.plugins[i].name)
}
var err error
@@ -131,7 +128,7 @@ func (s *Plugin) Collects() []interface{} {
// RegisterPlugin registers RPC service plugin.
func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
- s.services = append(s.services, pluggable{
+ s.plugins = append(s.plugins, pluggable{
service: p,
name: name.Name(),
})
diff --git a/plugins/server/config.go b/plugins/server/config.go
index 2bf30e70..a990efd3 100644
--- a/plugins/server/config.go
+++ b/plugins/server/config.go
@@ -4,37 +4,144 @@ import (
"time"
)
-// Config config combines factory, pool and cmd configurations.
+// All config (.rr.yaml)
+// For other section use pointer to distinguish between `empty` and `not present`
type Config struct {
- // Command to run as application.
- Command string
+ // Server config section
+ Server struct {
+ // Command to run as application.
+ Command string `yaml:"command"`
+ // User to run application under.
+ User string `yaml:"user"`
+ // Group to run application under.
+ Group string `yaml:"group"`
+ // Env represents application environment.
+ Env Env `yaml:"env"`
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string `yaml:"relay"`
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration `yaml:"relayTimeout"`
+ } `yaml:"server"`
- // User to run application under.
- User string
-
- // Group to run application under.
- Group string
-
- // Env represents application environment.
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration. Defaults to 60s.
- RelayTimeout time.Duration
+ RPC *struct {
+ Listen string `yaml:"listen"`
+ } `yaml:"rpc"`
+ Logs *struct {
+ Mode string `yaml:"mode"`
+ Level string `yaml:"level"`
+ } `yaml:"logs"`
+ HTTP *struct {
+ Address string `yaml:"address"`
+ MaxRequestSize int `yaml:"max_request_size"`
+ Middleware []string `yaml:"middleware"`
+ Uploads struct {
+ Forbid []string `yaml:"forbid"`
+ } `yaml:"uploads"`
+ TrustedSubnets []string `yaml:"trusted_subnets"`
+ Pool struct {
+ NumWorkers int `yaml:"num_workers"`
+ MaxJobs int `yaml:"max_jobs"`
+ AllocateTimeout string `yaml:"allocate_timeout"`
+ DestroyTimeout string `yaml:"destroy_timeout"`
+ Supervisor struct {
+ WatchTick int `yaml:"watch_tick"`
+ TTL int `yaml:"ttl"`
+ IdleTTL int `yaml:"idle_ttl"`
+ ExecTTL int `yaml:"exec_ttl"`
+ MaxWorkerMemory int `yaml:"max_worker_memory"`
+ } `yaml:"supervisor"`
+ } `yaml:"pool"`
+ Ssl struct {
+ Port int `yaml:"port"`
+ Redirect bool `yaml:"redirect"`
+ Cert string `yaml:"cert"`
+ Key string `yaml:"key"`
+ } `yaml:"ssl"`
+ Fcgi struct {
+ Address string `yaml:"address"`
+ } `yaml:"fcgi"`
+ HTTP2 struct {
+ Enabled bool `yaml:"enabled"`
+ H2C bool `yaml:"h2c"`
+ MaxConcurrentStreams int `yaml:"max_concurrent_streams"`
+ } `yaml:"http2"`
+ } `yaml:"http"`
+ Redis *struct {
+ Addrs []string `yaml:"addrs"`
+ MasterName string `yaml:"master_name"`
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ DB int `yaml:"db"`
+ SentinelPassword string `yaml:"sentinel_password"`
+ RouteByLatency bool `yaml:"route_by_latency"`
+ RouteRandomly bool `yaml:"route_randomly"`
+ DialTimeout int `yaml:"dial_timeout"`
+ MaxRetries int `yaml:"max_retries"`
+ MinRetryBackoff int `yaml:"min_retry_backoff"`
+ MaxRetryBackoff int `yaml:"max_retry_backoff"`
+ PoolSize int `yaml:"pool_size"`
+ MinIdleConns int `yaml:"min_idle_conns"`
+ MaxConnAge int `yaml:"max_conn_age"`
+ ReadTimeout int `yaml:"read_timeout"`
+ WriteTimeout int `yaml:"write_timeout"`
+ PoolTimeout int `yaml:"pool_timeout"`
+ IdleTimeout int `yaml:"idle_timeout"`
+ IdleCheckFreq int `yaml:"idle_check_freq"`
+ ReadOnly bool `yaml:"read_only"`
+ } `yaml:"redis"`
+ Boltdb *struct {
+ Dir string `yaml:"dir"`
+ File string `yaml:"file"`
+ Bucket string `yaml:"bucket"`
+ Permissions int `yaml:"permissions"`
+ TTL int `yaml:"TTL"`
+ } `yaml:"boltdb"`
+ Memcached *struct {
+ Addr []string `yaml:"addr"`
+ } `yaml:"memcached"`
+ Memory *struct {
+ Enabled bool `yaml:"enabled"`
+ Interval int `yaml:"interval"`
+ } `yaml:"memory"`
+ Metrics *struct {
+ Address string `yaml:"address"`
+ Collect struct {
+ AppMetric struct {
+ Type string `yaml:"type"`
+ Help string `yaml:"help"`
+ Labels []string `yaml:"labels"`
+ Buckets []float64 `yaml:"buckets"`
+ Objectives []struct {
+ Num2 float64 `yaml:"2,omitempty"`
+ One4 float64 `yaml:"1.4,omitempty"`
+ } `yaml:"objectives"`
+ } `yaml:"app_metric"`
+ } `yaml:"collect"`
+ } `yaml:"metrics"`
+ Reload *struct {
+ Interval string `yaml:"interval"`
+ Patterns []string `yaml:"patterns"`
+ Services struct {
+ HTTP struct {
+ Recursive bool `yaml:"recursive"`
+ Ignore []string `yaml:"ignore"`
+ Patterns []string `yaml:"patterns"`
+ Dirs []string `yaml:"dirs"`
+ } `yaml:"http"`
+ } `yaml:"services"`
+ } `yaml:"reload"`
}
// InitDefaults for the server config
func (cfg *Config) InitDefaults() {
- if cfg.Relay == "" {
- cfg.Relay = "pipes"
+ if cfg.Server.Relay == "" {
+ cfg.Server.Relay = "pipes"
}
- if cfg.RelayTimeout == 0 {
- cfg.RelayTimeout = time.Second * 60
+ if cfg.Server.RelayTimeout == 0 {
+ cfg.Server.RelayTimeout = time.Second * 60
}
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 8d8a7694..565c80c4 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -24,6 +24,13 @@ import (
// PluginName for the server
const PluginName = "server"
+// RR_RELAY env variable key (internal)
+const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+// RR_RPC env variable key (internal) if the RPC presents
+const RR_RPC = "" //nolint:golint,stylecheck
+// RR_HTTP env variable key (internal) if the HTTP presents
+const RR_HTTP = "false" //nolint:golint,stylecheck
+
// Plugin manages worker
type Plugin struct {
cfg Config
@@ -34,7 +41,7 @@ type Plugin struct {
// Init application provider.
func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("Init")
- err := cfg.UnmarshalKey(PluginName, &server.cfg)
+ err := cfg.Unmarshal(&server.cfg)
if err != nil {
return errors.E(op, errors.Init, err)
}
@@ -43,7 +50,7 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
server.factory, err = server.initFactory()
if err != nil {
- return errors.E(errors.Op("Init factory"), err)
+ return errors.E(err)
}
return nil
@@ -75,7 +82,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
+ cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...)
if len(cmdArgs) < 2 {
return nil, errors.E(op, errors.Str("should be in form of `php <script>"))
}
@@ -93,8 +100,8 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// if user is not empty, and OS is linux or macos
// execute php worker from that particular user
- if server.cfg.User != "" {
- err := utils.ExecuteFromUser(cmd, server.cfg.User)
+ if server.cfg.Server.User != "" {
+ err := utils.ExecuteFromUser(cmd, server.cfg.Server.User)
if err != nil {
return nil
}
@@ -150,17 +157,17 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en
// creates relay and worker factory.
func (server *Plugin) initFactory() (worker.Factory, error) {
- const op = errors.Op("network factory init")
- if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
+ const op = errors.Op("server factory init")
+ if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" {
return pipe.NewPipeFactory(), nil
}
- dsn := strings.Split(server.cfg.Relay, "://")
+ dsn := strings.Split(server.cfg.Server.Relay, "://")
if len(dsn) != 2 {
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
- lsn, err := utils.CreateListener(server.cfg.Relay)
+ lsn, err := utils.CreateListener(server.cfg.Server.Relay)
if err != nil {
return nil, errors.E(op, errors.Network, err)
}
@@ -168,20 +175,35 @@ func (server *Plugin) initFactory() (worker.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
case "tcp":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
func (server *Plugin) setEnv(e Env) []string {
- env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay))
+ env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
}
+ // set internal env variables
+ if server.cfg.HTTP != nil {
+ env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true"))
+ }
+ if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" {
+ env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen))
+ }
+
+ // set env variables from the config
+ if len(server.cfg.Server.Env) > 0 {
+ for k, v := range server.cfg.Server.Env {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+ }
+
return env
}
@@ -195,7 +217,7 @@ func (server *Plugin) collectPoolLogs(event interface{}) {
case events.EventPoolError:
server.log.Info("pool error", "error", we.Payload.(error).Error())
case events.EventSupervisorError:
- server.log.Info("pool supervizor error", "error", we.Payload.(error).Error())
+ server.log.Info("pool supervisor error", "error", we.Payload.(error).Error())
case events.EventTTL:
server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid())
case events.EventWorkerConstruct:
diff --git a/tests/plugins/config/.rr.yaml b/tests/plugins/config/.rr.yaml
index 732a1366..bad2846a 100755
--- a/tests/plugins/config/.rr.yaml
+++ b/tests/plugins/config/.rr.yaml
@@ -1,3 +1,6 @@
+rpc:
+ listen: tcp://localhost:6060
+
reload:
enabled: true
interval: 1s
diff --git a/tests/plugins/config/config_test.go b/tests/plugins/config/config_test.go
index 858fcb80..6d95ba70 100755
--- a/tests/plugins/config/config_test.go
+++ b/tests/plugins/config/config_test.go
@@ -44,6 +44,7 @@ func TestViperProvider_Init(t *testing.T) {
signal.Notify(c, os.Interrupt)
tt := time.NewTicker(time.Second * 2)
+ defer tt.Stop()
for {
select {
@@ -53,12 +54,9 @@ func TestViperProvider_Init(t *testing.T) {
return
case <-c:
er := container.Stop()
- if er != nil {
- panic(er)
- }
+ assert.NoError(t, er)
return
case <-tt.C:
- tt.Stop()
assert.NoError(t, container.Stop())
return
}
diff --git a/tests/plugins/config/plugin1.go b/tests/plugins/config/plugin1.go
index 2afe79a4..a6c06aec 100755
--- a/tests/plugins/config/plugin1.go
+++ b/tests/plugins/config/plugin1.go
@@ -1,12 +1,41 @@
package config
import (
- "errors"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/config"
)
+type AllConfig struct {
+ RPC struct {
+ Listen string `yaml:"listen"`
+ } `yaml:"rpc"`
+ Reload struct {
+ Enabled bool `yaml:"enabled"`
+ Interval string `yaml:"interval"`
+ Patterns []string `yaml:"patterns"`
+ Services struct {
+ HTTP struct {
+ Recursive bool `yaml:"recursive"`
+ Ignore []string `yaml:"ignore"`
+ Patterns []string `yaml:"patterns"`
+ Dirs []string `yaml:"dirs"`
+ } `yaml:"http"`
+ Jobs struct {
+ Recursive bool `yaml:"recursive"`
+ Ignore []string `yaml:"ignore"`
+ Dirs []string `yaml:"dirs"`
+ } `yaml:"jobs"`
+ RPC struct {
+ Recursive bool `yaml:"recursive"`
+ Patterns []string `yaml:"patterns"`
+ Dirs []string `yaml:"dirs"`
+ } `yaml:"rpc"`
+ } `yaml:"services"`
+ } `yaml:"reload"`
+}
+
// ReloadConfig is a Reload configuration point.
type ReloadConfig struct {
Interval time.Duration
@@ -33,6 +62,7 @@ func (f *Foo) Init(p config.Configurer) error {
}
func (f *Foo) Serve() chan error {
+ const op = errors.Op("foo serve")
errCh := make(chan error, 1)
r := &ReloadConfig{}
@@ -42,7 +72,20 @@ func (f *Foo) Serve() chan error {
}
if len(r.Patterns) == 0 {
- errCh <- errors.New("should be at least one pattern, but got 0")
+ errCh <- errors.E(op, errors.Str("should be at least one pattern, but got 0"))
+ return errCh
+ }
+
+ var allCfg AllConfig
+ err = f.configProvider.Unmarshal(&allCfg)
+ if err != nil {
+ errCh <- errors.E(op, errors.Str("should be at least one pattern, but got 0"))
+ return errCh
+ }
+
+ if allCfg.RPC.Listen != "tcp://localhost:6060" {
+ errCh <- errors.E(op, errors.Str("RPC.Listen should be parsed"))
+ return errCh
}
return errCh
diff --git a/tests/plugins/kv/boltdb/plugin_test.go b/tests/plugins/kv/boltdb/plugin_test.go
index ba9b695a..5548402d 100644
--- a/tests/plugins/kv/boltdb/plugin_test.go
+++ b/tests/plugins/kv/boltdb/plugin_test.go
@@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) {
assert.Len(t, ttlRes, 3)
// HAS AFTER TTL
- time.Sleep(time.Second * 11)
+ time.Sleep(time.Second * 15)
ret = make(map[string]bool)
keys = []string{"a", "b", "d"}
err = client.Call("boltdb.Has", keys, &ret)
diff --git a/tests/plugins/kv/memcached/plugin_test.go b/tests/plugins/kv/memcached/plugin_test.go
index 6eff8715..d4cb58bb 100644
--- a/tests/plugins/kv/memcached/plugin_test.go
+++ b/tests/plugins/kv/memcached/plugin_test.go
@@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) {
assert.Len(t, ttlRes, 0)
// HAS AFTER TTL
- time.Sleep(time.Second * 11)
+ time.Sleep(time.Second * 15)
ret = make(map[string]bool)
keys = []string{"a", "b", "d"}
err = client.Call("memcached.Has", keys, &ret)
diff --git a/tests/plugins/kv/memory/plugin_test.go b/tests/plugins/kv/memory/plugin_test.go
index c6f94602..ee01fabb 100644
--- a/tests/plugins/kv/memory/plugin_test.go
+++ b/tests/plugins/kv/memory/plugin_test.go
@@ -172,7 +172,7 @@ func testRPCMethods(t *testing.T) {
assert.Len(t, ttlRes, 3)
// HAS AFTER TTL
- time.Sleep(time.Second * 11)
+ time.Sleep(time.Second * 15)
ret = make(map[string]bool)
keys = []string{"a", "b", "d"}
err = client.Call("memory.Has", keys, &ret)
diff --git a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
index d5c185e7..5ab359d3 100644
--- a/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
+++ b/tests/plugins/rpc/configs/.rr-rpc-disabled.yaml
@@ -1,6 +1,3 @@
-rpc:
- listen: tcp://127.0.0.1:6001
- disabled: true
logs:
mode: development
level: error \ No newline at end of file
diff --git a/tests/plugins/rpc/configs/.rr.yaml b/tests/plugins/rpc/configs/.rr.yaml
index d2cb6c70..67d935e3 100644
--- a/tests/plugins/rpc/configs/.rr.yaml
+++ b/tests/plugins/rpc/configs/.rr.yaml
@@ -1,6 +1,5 @@
rpc:
listen: tcp://127.0.0.1:6001
- disabled: false
logs:
mode: development
level: error \ No newline at end of file