summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-13 11:11:36 +0300
committerValery Piashchynski <[email protected]>2021-01-13 11:11:36 +0300
commitc3cf1d988b980e9408862d380f7ae33dae501e05 (patch)
tree79430ed15f75e23242e921a1471633e33279c395 /plugins
parent44b0ad21e0d70e413a62814fb408faa033b0d478 (diff)
Update styly of the .rr.yaml
Add comments Add support for the automatically set RR_RPC, RR_HTTP env variables for the worker process.
Diffstat (limited to 'plugins')
-rw-r--r--plugins/config/interface.go6
-rwxr-xr-xplugins/config/plugin.go9
-rw-r--r--plugins/http/config.go6
-rw-r--r--plugins/rpc/config.go3
-rw-r--r--plugins/rpc/plugin.go27
-rw-r--r--plugins/server/config.go155
-rw-r--r--plugins/server/plugin.go45
7 files changed, 192 insertions, 59 deletions
diff --git a/plugins/config/interface.go b/plugins/config/interface.go
index 2a7c67ce..23279f53 100644
--- a/plugins/config/interface.go
+++ b/plugins/config/interface.go
@@ -1,7 +1,7 @@
package config
type Configurer interface {
- // UnmarshalKey reads configuration section into configuration object.
+ // // UnmarshalKey takes a single key and unmarshals it into a Struct.
//
// func (h *HttpService) Init(cp config.Configurer) error {
// h.config := &HttpConfig{}
@@ -11,6 +11,10 @@ type Configurer interface {
// }
UnmarshalKey(name string, out interface{}) error
+ // Unmarshal unmarshals the config into a Struct. Make sure that the tags
+ // on the fields of the structure are properly set.
+ Unmarshal(out interface{}) error
+
// Get used to get config section
Get(name string) interface{}
diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go
index 1a170448..9cecf9f9 100755
--- a/plugins/config/plugin.go
+++ b/plugins/config/plugin.go
@@ -64,6 +64,15 @@ func (v *Viper) UnmarshalKey(name string, out interface{}) error {
return nil
}
+func (v *Viper) Unmarshal(out interface{}) error {
+ const op = errors.Op("config unmarshal")
+ err := v.viper.Unmarshal(&out)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+
// Get raw config in a form of config section.
func (v *Viper) Get(name string) interface{} {
return v.viper.Get(name)
diff --git a/plugins/http/config.go b/plugins/http/config.go
index 3b670c86..abde8917 100644
--- a/plugins/http/config.go
+++ b/plugins/http/config.go
@@ -49,10 +49,10 @@ type Config struct {
HTTP2 *HTTP2Config
// MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited.
- MaxRequestSize uint64
+ MaxRequestSize uint64 `yaml:"max_request_size"`
// TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For
- TrustedSubnets []string
+ TrustedSubnets []string `yaml:"trusted_subnets"`
// Uploads configures uploads configuration.
Uploads *UploadsConfig
@@ -85,7 +85,7 @@ type HTTP2Config struct {
H2C bool
// MaxConcurrentStreams defaults to 128.
- MaxConcurrentStreams uint32
+ MaxConcurrentStreams uint32 `yaml:"max_concurrent_streams"`
}
// InitDefaults sets default values for HTTP/2 configuration.
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
index d7531435..88ad7f0e 100644
--- a/plugins/rpc/config.go
+++ b/plugins/rpc/config.go
@@ -12,9 +12,6 @@ import (
type Config struct {
// Listen string
Listen string
-
- // Disabled disables RPC service.
- Disabled bool
}
// InitDefaults allows to init blank config with pre-defined set of default values.
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 6a83326b..c5813e7b 100644
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -22,17 +22,18 @@ type pluggable struct {
// Plugin is RPC service.
type Plugin struct {
- cfg Config
- log logger.Logger
- rpc *rpc.Server
- services []pluggable
+ cfg Config
+ log logger.Logger
+ rpc *rpc.Server
+ // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC
+ plugins []pluggable
listener net.Listener
closed *uint32
}
// Init rpc service. Must return true if service is enabled.
func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("RPC plugin init")
+ const op = errors.Op("rpc plugin init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
@@ -43,10 +44,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
s.cfg.InitDefaults()
- if s.cfg.Disabled {
- return errors.E(op, errors.Disabled)
- }
-
s.log = log
state := uint32(0)
s.closed = &state
@@ -57,22 +54,22 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
// Serve serves the service.
func (s *Plugin) Serve() chan error {
- const op = errors.Op("register service")
+ const op = errors.Op("serve rpc plugin")
errCh := make(chan error, 1)
s.rpc = rpc.NewServer()
- services := make([]string, 0, len(s.services))
+ services := make([]string, 0, len(s.plugins))
// Attach all services
- for i := 0; i < len(s.services); i++ {
- err := s.Register(s.services[i].name, s.services[i].service.RPC())
+ for i := 0; i < len(s.plugins); i++ {
+ err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
}
- services = append(services, s.services[i].name)
+ services = append(services, s.plugins[i].name)
}
var err error
@@ -131,7 +128,7 @@ func (s *Plugin) Collects() []interface{} {
// RegisterPlugin registers RPC service plugin.
func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) {
- s.services = append(s.services, pluggable{
+ s.plugins = append(s.plugins, pluggable{
service: p,
name: name.Name(),
})
diff --git a/plugins/server/config.go b/plugins/server/config.go
index 2bf30e70..a990efd3 100644
--- a/plugins/server/config.go
+++ b/plugins/server/config.go
@@ -4,37 +4,144 @@ import (
"time"
)
-// Config config combines factory, pool and cmd configurations.
+// All config (.rr.yaml)
+// For other section use pointer to distinguish between `empty` and `not present`
type Config struct {
- // Command to run as application.
- Command string
+ // Server config section
+ Server struct {
+ // Command to run as application.
+ Command string `yaml:"command"`
+ // User to run application under.
+ User string `yaml:"user"`
+ // Group to run application under.
+ Group string `yaml:"group"`
+ // Env represents application environment.
+ Env Env `yaml:"env"`
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string `yaml:"relay"`
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration `yaml:"relayTimeout"`
+ } `yaml:"server"`
- // User to run application under.
- User string
-
- // Group to run application under.
- Group string
-
- // Env represents application environment.
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration. Defaults to 60s.
- RelayTimeout time.Duration
+ RPC *struct {
+ Listen string `yaml:"listen"`
+ } `yaml:"rpc"`
+ Logs *struct {
+ Mode string `yaml:"mode"`
+ Level string `yaml:"level"`
+ } `yaml:"logs"`
+ HTTP *struct {
+ Address string `yaml:"address"`
+ MaxRequestSize int `yaml:"max_request_size"`
+ Middleware []string `yaml:"middleware"`
+ Uploads struct {
+ Forbid []string `yaml:"forbid"`
+ } `yaml:"uploads"`
+ TrustedSubnets []string `yaml:"trusted_subnets"`
+ Pool struct {
+ NumWorkers int `yaml:"num_workers"`
+ MaxJobs int `yaml:"max_jobs"`
+ AllocateTimeout string `yaml:"allocate_timeout"`
+ DestroyTimeout string `yaml:"destroy_timeout"`
+ Supervisor struct {
+ WatchTick int `yaml:"watch_tick"`
+ TTL int `yaml:"ttl"`
+ IdleTTL int `yaml:"idle_ttl"`
+ ExecTTL int `yaml:"exec_ttl"`
+ MaxWorkerMemory int `yaml:"max_worker_memory"`
+ } `yaml:"supervisor"`
+ } `yaml:"pool"`
+ Ssl struct {
+ Port int `yaml:"port"`
+ Redirect bool `yaml:"redirect"`
+ Cert string `yaml:"cert"`
+ Key string `yaml:"key"`
+ } `yaml:"ssl"`
+ Fcgi struct {
+ Address string `yaml:"address"`
+ } `yaml:"fcgi"`
+ HTTP2 struct {
+ Enabled bool `yaml:"enabled"`
+ H2C bool `yaml:"h2c"`
+ MaxConcurrentStreams int `yaml:"max_concurrent_streams"`
+ } `yaml:"http2"`
+ } `yaml:"http"`
+ Redis *struct {
+ Addrs []string `yaml:"addrs"`
+ MasterName string `yaml:"master_name"`
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ DB int `yaml:"db"`
+ SentinelPassword string `yaml:"sentinel_password"`
+ RouteByLatency bool `yaml:"route_by_latency"`
+ RouteRandomly bool `yaml:"route_randomly"`
+ DialTimeout int `yaml:"dial_timeout"`
+ MaxRetries int `yaml:"max_retries"`
+ MinRetryBackoff int `yaml:"min_retry_backoff"`
+ MaxRetryBackoff int `yaml:"max_retry_backoff"`
+ PoolSize int `yaml:"pool_size"`
+ MinIdleConns int `yaml:"min_idle_conns"`
+ MaxConnAge int `yaml:"max_conn_age"`
+ ReadTimeout int `yaml:"read_timeout"`
+ WriteTimeout int `yaml:"write_timeout"`
+ PoolTimeout int `yaml:"pool_timeout"`
+ IdleTimeout int `yaml:"idle_timeout"`
+ IdleCheckFreq int `yaml:"idle_check_freq"`
+ ReadOnly bool `yaml:"read_only"`
+ } `yaml:"redis"`
+ Boltdb *struct {
+ Dir string `yaml:"dir"`
+ File string `yaml:"file"`
+ Bucket string `yaml:"bucket"`
+ Permissions int `yaml:"permissions"`
+ TTL int `yaml:"TTL"`
+ } `yaml:"boltdb"`
+ Memcached *struct {
+ Addr []string `yaml:"addr"`
+ } `yaml:"memcached"`
+ Memory *struct {
+ Enabled bool `yaml:"enabled"`
+ Interval int `yaml:"interval"`
+ } `yaml:"memory"`
+ Metrics *struct {
+ Address string `yaml:"address"`
+ Collect struct {
+ AppMetric struct {
+ Type string `yaml:"type"`
+ Help string `yaml:"help"`
+ Labels []string `yaml:"labels"`
+ Buckets []float64 `yaml:"buckets"`
+ Objectives []struct {
+ Num2 float64 `yaml:"2,omitempty"`
+ One4 float64 `yaml:"1.4,omitempty"`
+ } `yaml:"objectives"`
+ } `yaml:"app_metric"`
+ } `yaml:"collect"`
+ } `yaml:"metrics"`
+ Reload *struct {
+ Interval string `yaml:"interval"`
+ Patterns []string `yaml:"patterns"`
+ Services struct {
+ HTTP struct {
+ Recursive bool `yaml:"recursive"`
+ Ignore []string `yaml:"ignore"`
+ Patterns []string `yaml:"patterns"`
+ Dirs []string `yaml:"dirs"`
+ } `yaml:"http"`
+ } `yaml:"services"`
+ } `yaml:"reload"`
}
// InitDefaults for the server config
func (cfg *Config) InitDefaults() {
- if cfg.Relay == "" {
- cfg.Relay = "pipes"
+ if cfg.Server.Relay == "" {
+ cfg.Server.Relay = "pipes"
}
- if cfg.RelayTimeout == 0 {
- cfg.RelayTimeout = time.Second * 60
+ if cfg.Server.RelayTimeout == 0 {
+ cfg.Server.RelayTimeout = time.Second * 60
}
}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index e4c91c9e..565c80c4 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -24,8 +24,12 @@ import (
// PluginName for the server
const PluginName = "server"
-// RR_RELAY env variable key
+// RR_RELAY env variable key (internal)
const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+// RR_RPC env variable key (internal) if the RPC presents
+const RR_RPC = "" //nolint:golint,stylecheck
+// RR_HTTP env variable key (internal) if the HTTP presents
+const RR_HTTP = "false" //nolint:golint,stylecheck
// Plugin manages worker
type Plugin struct {
@@ -37,7 +41,7 @@ type Plugin struct {
// Init application provider.
func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("Init")
- err := cfg.UnmarshalKey(PluginName, &server.cfg)
+ err := cfg.Unmarshal(&server.cfg)
if err != nil {
return errors.E(op, errors.Init, err)
}
@@ -46,7 +50,7 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
server.factory, err = server.initFactory()
if err != nil {
- return errors.E(errors.Op("Init factory"), err)
+ return errors.E(err)
}
return nil
@@ -78,7 +82,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
+ cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...)
if len(cmdArgs) < 2 {
return nil, errors.E(op, errors.Str("should be in form of `php <script>"))
}
@@ -96,8 +100,8 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
// if user is not empty, and OS is linux or macos
// execute php worker from that particular user
- if server.cfg.User != "" {
- err := utils.ExecuteFromUser(cmd, server.cfg.User)
+ if server.cfg.Server.User != "" {
+ err := utils.ExecuteFromUser(cmd, server.cfg.Server.User)
if err != nil {
return nil
}
@@ -153,17 +157,17 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en
// creates relay and worker factory.
func (server *Plugin) initFactory() (worker.Factory, error) {
- const op = errors.Op("network factory init")
- if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
+ const op = errors.Op("server factory init")
+ if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" {
return pipe.NewPipeFactory(), nil
}
- dsn := strings.Split(server.cfg.Relay, "://")
+ dsn := strings.Split(server.cfg.Server.Relay, "://")
if len(dsn) != 2 {
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
- lsn, err := utils.CreateListener(server.cfg.Relay)
+ lsn, err := utils.CreateListener(server.cfg.Server.Relay)
if err != nil {
return nil, errors.E(op, errors.Network, err)
}
@@ -171,20 +175,35 @@ func (server *Plugin) initFactory() (worker.Factory, error) {
switch dsn[0] {
// sockets group
case "unix":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
case "tcp":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
default:
return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
func (server *Plugin) setEnv(e Env) []string {
- env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Relay))
+ env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
}
+ // set internal env variables
+ if server.cfg.HTTP != nil {
+ env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true"))
+ }
+ if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" {
+ env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen))
+ }
+
+ // set env variables from the config
+ if len(server.cfg.Server.Env) > 0 {
+ for k, v := range server.cfg.Server.Env {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+ }
+
return env
}