diff options
author | Valery Piashchynski <[email protected]> | 2021-01-13 11:11:36 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-01-13 11:11:36 +0300 |
commit | c3cf1d988b980e9408862d380f7ae33dae501e05 (patch) | |
tree | 79430ed15f75e23242e921a1471633e33279c395 /plugins | |
parent | 44b0ad21e0d70e413a62814fb408faa033b0d478 (diff) |
Update styly of the .rr.yaml
Add comments
Add support for the automatically set RR_RPC, RR_HTTP env variables for
the worker process.
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/config/interface.go | 6 | ||||
-rwxr-xr-x | plugins/config/plugin.go | 9 | ||||
-rw-r--r-- | plugins/http/config.go | 6 | ||||
-rw-r--r-- | plugins/rpc/config.go | 3 | ||||
-rw-r--r-- | plugins/rpc/plugin.go | 27 | ||||
-rw-r--r-- | plugins/server/config.go | 155 | ||||
-rw-r--r-- | plugins/server/plugin.go | 45 |
7 files changed, 192 insertions, 59 deletions
diff --git a/plugins/config/interface.go b/plugins/config/interface.go index 2a7c67ce..23279f53 100644 --- a/plugins/config/interface.go +++ b/plugins/config/interface.go @@ -1,7 +1,7 @@ package config type Configurer interface { - // UnmarshalKey reads configuration section into configuration object. + // // UnmarshalKey takes a single key and unmarshals it into a Struct. // // func (h *HttpService) Init(cp config.Configurer) error { // h.config := &HttpConfig{} @@ -11,6 +11,10 @@ type Configurer interface { // } UnmarshalKey(name string, out interface{}) error + // Unmarshal unmarshals the config into a Struct. Make sure that the tags + // on the fields of the structure are properly set. + Unmarshal(out interface{}) error + // Get used to get config section Get(name string) interface{} diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go index 1a170448..9cecf9f9 100755 --- a/plugins/config/plugin.go +++ b/plugins/config/plugin.go @@ -64,6 +64,15 @@ func (v *Viper) UnmarshalKey(name string, out interface{}) error { return nil } +func (v *Viper) Unmarshal(out interface{}) error { + const op = errors.Op("config unmarshal") + err := v.viper.Unmarshal(&out) + if err != nil { + return errors.E(op, err) + } + return nil +} + // Get raw config in a form of config section. func (v *Viper) Get(name string) interface{} { return v.viper.Get(name) diff --git a/plugins/http/config.go b/plugins/http/config.go index 3b670c86..abde8917 100644 --- a/plugins/http/config.go +++ b/plugins/http/config.go @@ -49,10 +49,10 @@ type Config struct { HTTP2 *HTTP2Config // MaxRequestSize specified max size for payload body in megabytes, set 0 to unlimited. - MaxRequestSize uint64 + MaxRequestSize uint64 `yaml:"max_request_size"` // TrustedSubnets declare IP subnets which are allowed to set ip using X-Real-Ip and X-Forwarded-For - TrustedSubnets []string + TrustedSubnets []string `yaml:"trusted_subnets"` // Uploads configures uploads configuration. Uploads *UploadsConfig @@ -85,7 +85,7 @@ type HTTP2Config struct { H2C bool // MaxConcurrentStreams defaults to 128. - MaxConcurrentStreams uint32 + MaxConcurrentStreams uint32 `yaml:"max_concurrent_streams"` } // InitDefaults sets default values for HTTP/2 configuration. diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index d7531435..88ad7f0e 100644 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,9 +12,6 @@ import ( type Config struct { // Listen string Listen string - - // Disabled disables RPC service. - Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 6a83326b..c5813e7b 100644 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -22,17 +22,18 @@ type pluggable struct { // Plugin is RPC service. type Plugin struct { - cfg Config - log logger.Logger - rpc *rpc.Server - services []pluggable + cfg Config + log logger.Logger + rpc *rpc.Server + // set of the plugins, which are implement RPCer interface and can be plugged into the RR via RPC + plugins []pluggable listener net.Listener closed *uint32 } // Init rpc service. Must return true if service is enabled. func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("RPC plugin init") + const op = errors.Op("rpc plugin init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } @@ -43,10 +44,6 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } s.cfg.InitDefaults() - if s.cfg.Disabled { - return errors.E(op, errors.Disabled) - } - s.log = log state := uint32(0) s.closed = &state @@ -57,22 +54,22 @@ func (s *Plugin) Init(cfg config.Configurer, log logger.Logger) error { // Serve serves the service. func (s *Plugin) Serve() chan error { - const op = errors.Op("register service") + const op = errors.Op("serve rpc plugin") errCh := make(chan error, 1) s.rpc = rpc.NewServer() - services := make([]string, 0, len(s.services)) + services := make([]string, 0, len(s.plugins)) // Attach all services - for i := 0; i < len(s.services); i++ { - err := s.Register(s.services[i].name, s.services[i].service.RPC()) + for i := 0; i < len(s.plugins); i++ { + err := s.Register(s.plugins[i].name, s.plugins[i].service.RPC()) if err != nil { errCh <- errors.E(op, err) return errCh } - services = append(services, s.services[i].name) + services = append(services, s.plugins[i].name) } var err error @@ -131,7 +128,7 @@ func (s *Plugin) Collects() []interface{} { // RegisterPlugin registers RPC service plugin. func (s *Plugin) RegisterPlugin(name endure.Named, p RPCer) { - s.services = append(s.services, pluggable{ + s.plugins = append(s.plugins, pluggable{ service: p, name: name.Name(), }) diff --git a/plugins/server/config.go b/plugins/server/config.go index 2bf30e70..a990efd3 100644 --- a/plugins/server/config.go +++ b/plugins/server/config.go @@ -4,37 +4,144 @@ import ( "time" ) -// Config config combines factory, pool and cmd configurations. +// All config (.rr.yaml) +// For other section use pointer to distinguish between `empty` and `not present` type Config struct { - // Command to run as application. - Command string + // Server config section + Server struct { + // Command to run as application. + Command string `yaml:"command"` + // User to run application under. + User string `yaml:"user"` + // Group to run application under. + Group string `yaml:"group"` + // Env represents application environment. + Env Env `yaml:"env"` + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string `yaml:"relay"` + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration `yaml:"relayTimeout"` + } `yaml:"server"` - // User to run application under. - User string - - // Group to run application under. - Group string - - // Env represents application environment. - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. Defaults to 60s. - RelayTimeout time.Duration + RPC *struct { + Listen string `yaml:"listen"` + } `yaml:"rpc"` + Logs *struct { + Mode string `yaml:"mode"` + Level string `yaml:"level"` + } `yaml:"logs"` + HTTP *struct { + Address string `yaml:"address"` + MaxRequestSize int `yaml:"max_request_size"` + Middleware []string `yaml:"middleware"` + Uploads struct { + Forbid []string `yaml:"forbid"` + } `yaml:"uploads"` + TrustedSubnets []string `yaml:"trusted_subnets"` + Pool struct { + NumWorkers int `yaml:"num_workers"` + MaxJobs int `yaml:"max_jobs"` + AllocateTimeout string `yaml:"allocate_timeout"` + DestroyTimeout string `yaml:"destroy_timeout"` + Supervisor struct { + WatchTick int `yaml:"watch_tick"` + TTL int `yaml:"ttl"` + IdleTTL int `yaml:"idle_ttl"` + ExecTTL int `yaml:"exec_ttl"` + MaxWorkerMemory int `yaml:"max_worker_memory"` + } `yaml:"supervisor"` + } `yaml:"pool"` + Ssl struct { + Port int `yaml:"port"` + Redirect bool `yaml:"redirect"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + } `yaml:"ssl"` + Fcgi struct { + Address string `yaml:"address"` + } `yaml:"fcgi"` + HTTP2 struct { + Enabled bool `yaml:"enabled"` + H2C bool `yaml:"h2c"` + MaxConcurrentStreams int `yaml:"max_concurrent_streams"` + } `yaml:"http2"` + } `yaml:"http"` + Redis *struct { + Addrs []string `yaml:"addrs"` + MasterName string `yaml:"master_name"` + Username string `yaml:"username"` + Password string `yaml:"password"` + DB int `yaml:"db"` + SentinelPassword string `yaml:"sentinel_password"` + RouteByLatency bool `yaml:"route_by_latency"` + RouteRandomly bool `yaml:"route_randomly"` + DialTimeout int `yaml:"dial_timeout"` + MaxRetries int `yaml:"max_retries"` + MinRetryBackoff int `yaml:"min_retry_backoff"` + MaxRetryBackoff int `yaml:"max_retry_backoff"` + PoolSize int `yaml:"pool_size"` + MinIdleConns int `yaml:"min_idle_conns"` + MaxConnAge int `yaml:"max_conn_age"` + ReadTimeout int `yaml:"read_timeout"` + WriteTimeout int `yaml:"write_timeout"` + PoolTimeout int `yaml:"pool_timeout"` + IdleTimeout int `yaml:"idle_timeout"` + IdleCheckFreq int `yaml:"idle_check_freq"` + ReadOnly bool `yaml:"read_only"` + } `yaml:"redis"` + Boltdb *struct { + Dir string `yaml:"dir"` + File string `yaml:"file"` + Bucket string `yaml:"bucket"` + Permissions int `yaml:"permissions"` + TTL int `yaml:"TTL"` + } `yaml:"boltdb"` + Memcached *struct { + Addr []string `yaml:"addr"` + } `yaml:"memcached"` + Memory *struct { + Enabled bool `yaml:"enabled"` + Interval int `yaml:"interval"` + } `yaml:"memory"` + Metrics *struct { + Address string `yaml:"address"` + Collect struct { + AppMetric struct { + Type string `yaml:"type"` + Help string `yaml:"help"` + Labels []string `yaml:"labels"` + Buckets []float64 `yaml:"buckets"` + Objectives []struct { + Num2 float64 `yaml:"2,omitempty"` + One4 float64 `yaml:"1.4,omitempty"` + } `yaml:"objectives"` + } `yaml:"app_metric"` + } `yaml:"collect"` + } `yaml:"metrics"` + Reload *struct { + Interval string `yaml:"interval"` + Patterns []string `yaml:"patterns"` + Services struct { + HTTP struct { + Recursive bool `yaml:"recursive"` + Ignore []string `yaml:"ignore"` + Patterns []string `yaml:"patterns"` + Dirs []string `yaml:"dirs"` + } `yaml:"http"` + } `yaml:"services"` + } `yaml:"reload"` } // InitDefaults for the server config func (cfg *Config) InitDefaults() { - if cfg.Relay == "" { - cfg.Relay = "pipes" + if cfg.Server.Relay == "" { + cfg.Server.Relay = "pipes" } - if cfg.RelayTimeout == 0 { - cfg.RelayTimeout = time.Second * 60 + if cfg.Server.RelayTimeout == 0 { + cfg.Server.RelayTimeout = time.Second * 60 } } diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index e4c91c9e..565c80c4 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -24,8 +24,12 @@ import ( // PluginName for the server const PluginName = "server" -// RR_RELAY env variable key +// RR_RELAY env variable key (internal) const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck +// RR_RPC env variable key (internal) if the RPC presents +const RR_RPC = "" //nolint:golint,stylecheck +// RR_HTTP env variable key (internal) if the HTTP presents +const RR_HTTP = "false" //nolint:golint,stylecheck // Plugin manages worker type Plugin struct { @@ -37,7 +41,7 @@ type Plugin struct { // Init application provider. func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("Init") - err := cfg.UnmarshalKey(PluginName, &server.cfg) + err := cfg.Unmarshal(&server.cfg) if err != nil { return errors.E(op, errors.Init, err) } @@ -46,7 +50,7 @@ func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { server.factory, err = server.initFactory() if err != nil { - return errors.E(errors.Op("Init factory"), err) + return errors.E(err) } return nil @@ -78,7 +82,7 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...) if len(cmdArgs) < 2 { return nil, errors.E(op, errors.Str("should be in form of `php <script>")) } @@ -96,8 +100,8 @@ func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { // if user is not empty, and OS is linux or macos // execute php worker from that particular user - if server.cfg.User != "" { - err := utils.ExecuteFromUser(cmd, server.cfg.User) + if server.cfg.Server.User != "" { + err := utils.ExecuteFromUser(cmd, server.cfg.Server.User) if err != nil { return nil } @@ -153,17 +157,17 @@ func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, en // creates relay and worker factory. func (server *Plugin) initFactory() (worker.Factory, error) { - const op = errors.Op("network factory init") - if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { + const op = errors.Op("server factory init") + if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { return pipe.NewPipeFactory(), nil } - dsn := strings.Split(server.cfg.Relay, "://") + dsn := strings.Split(server.cfg.Server.Relay, "://") if len(dsn) != 2 { return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } - lsn, err := utils.CreateListener(server.cfg.Relay) + lsn, err := utils.CreateListener(server.cfg.Server.Relay) if err != nil { return nil, errors.E(op, errors.Network, err) } @@ -171,20 +175,35 @@ func (server *Plugin) initFactory() (worker.Factory, error) { switch dsn[0] { // sockets group case "unix": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil case "tcp": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil default: return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } func (server *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Relay)) + env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) } + // set internal env variables + if server.cfg.HTTP != nil { + env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true")) + } + if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" { + env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen)) + } + + // set env variables from the config + if len(server.cfg.Server.Env) > 0 { + for k, v := range server.cfg.Server.Env { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + } + return env } |