diff options
Diffstat (limited to 'plugins/server')
-rw-r--r-- | plugins/server/config.go | 147 | ||||
-rw-r--r-- | plugins/server/interface.go | 21 | ||||
-rw-r--r-- | plugins/server/plugin.go | 257 |
3 files changed, 425 insertions, 0 deletions
diff --git a/plugins/server/config.go b/plugins/server/config.go new file mode 100644 index 00000000..93b19226 --- /dev/null +++ b/plugins/server/config.go @@ -0,0 +1,147 @@ +package server + +import ( + "time" +) + +// All config (.rr.yaml) +// For other section use pointer to distinguish between `empty` and `not present` +type Config struct { + // Server config section + Server struct { + // Command to run as application. + Command string `mapstructure:"command"` + // User to run application under. + User string `mapstructure:"user"` + // Group to run application under. + Group string `mapstructure:"group"` + // Env represents application environment. + Env Env `mapstructure:"env"` + // Relay defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string `mapstructure:"relay"` + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration `mapstructure:"relayTimeout"` + } `mapstructure:"server"` + + RPC *struct { + Listen string `mapstructure:"listen"` + } `mapstructure:"rpc"` + Logs *struct { + Mode string `mapstructure:"mode"` + Level string `mapstructure:"level"` + } `mapstructure:"logs"` + HTTP *struct { + Address string `mapstructure:"address"` + MaxRequestSize int `mapstructure:"max_request_size"` + Middleware []string `mapstructure:"middleware"` + Uploads struct { + Forbid []string `mapstructure:"forbid"` + } `mapstructure:"uploads"` + TrustedSubnets []string `mapstructure:"trusted_subnets"` + Pool struct { + NumWorkers int `mapstructure:"num_workers"` + MaxJobs int `mapstructure:"max_jobs"` + AllocateTimeout string `mapstructure:"allocate_timeout"` + DestroyTimeout string `mapstructure:"destroy_timeout"` + Supervisor struct { + WatchTick int `mapstructure:"watch_tick"` + TTL int `mapstructure:"ttl"` + IdleTTL int `mapstructure:"idle_ttl"` + ExecTTL int `mapstructure:"exec_ttl"` + MaxWorkerMemory int `mapstructure:"max_worker_memory"` + } `mapstructure:"supervisor"` + } `mapstructure:"pool"` + Ssl struct { + Port int `mapstructure:"port"` + Redirect bool `mapstructure:"redirect"` + Cert string `mapstructure:"cert"` + Key string `mapstructure:"key"` + } `mapstructure:"ssl"` + Fcgi struct { + Address string `mapstructure:"address"` + } `mapstructure:"fcgi"` + HTTP2 struct { + Enabled bool `mapstructure:"enabled"` + H2C bool `mapstructure:"h2c"` + MaxConcurrentStreams int `mapstructure:"max_concurrent_streams"` + } `mapstructure:"http2"` + } `mapstructure:"http"` + Redis *struct { + Addrs []string `mapstructure:"addrs"` + MasterName string `mapstructure:"master_name"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + DB int `mapstructure:"db"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + DialTimeout int `mapstructure:"dial_timeout"` + MaxRetries int `mapstructure:"max_retries"` + MinRetryBackoff int `mapstructure:"min_retry_backoff"` + MaxRetryBackoff int `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge int `mapstructure:"max_conn_age"` + ReadTimeout int `mapstructure:"read_timeout"` + WriteTimeout int `mapstructure:"write_timeout"` + PoolTimeout int `mapstructure:"pool_timeout"` + IdleTimeout int `mapstructure:"idle_timeout"` + IdleCheckFreq int `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` + } `mapstructure:"redis"` + Boltdb *struct { + Dir string `mapstructure:"dir"` + File string `mapstructure:"file"` + Bucket string `mapstructure:"bucket"` + Permissions int `mapstructure:"permissions"` + TTL int `mapstructure:"TTL"` + } `mapstructure:"boltdb"` + Memcached *struct { + Addr []string `mapstructure:"addr"` + } `mapstructure:"memcached"` + Memory *struct { + Enabled bool `mapstructure:"enabled"` + Interval int `mapstructure:"interval"` + } `mapstructure:"memory"` + Metrics *struct { + Address string `mapstructure:"address"` + Collect struct { + AppMetric struct { + Type string `mapstructure:"type"` + Help string `mapstructure:"help"` + Labels []string `mapstructure:"labels"` + Buckets []float64 `mapstructure:"buckets"` + Objectives []struct { + Num2 float64 `mapstructure:"2,omitempty"` + One4 float64 `mapstructure:"1.4,omitempty"` + } `mapstructure:"objectives"` + } `mapstructure:"app_metric"` + } `mapstructure:"collect"` + } `mapstructure:"metrics"` + Reload *struct { + Interval string `mapstructure:"interval"` + Patterns []string `mapstructure:"patterns"` + Services struct { + HTTP struct { + Recursive bool `mapstructure:"recursive"` + Ignore []string `mapstructure:"ignore"` + Patterns []string `mapstructure:"patterns"` + Dirs []string `mapstructure:"dirs"` + } `mapstructure:"http"` + } `mapstructure:"services"` + } `mapstructure:"reload"` +} + +// InitDefaults for the server config +func (cfg *Config) InitDefaults() { + if cfg.Server.Relay == "" { + cfg.Server.Relay = "pipes" + } + + if cfg.Server.RelayTimeout == 0 { + cfg.Server.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/server/interface.go b/plugins/server/interface.go new file mode 100644 index 00000000..a2d8b92b --- /dev/null +++ b/plugins/server/interface.go @@ -0,0 +1,21 @@ +package server + +import ( + "context" + "os/exec" + + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" +) + +// Env variables type alias +type Env map[string]string + +// Server creates workers for the application. +type Server interface { + CmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) + NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) +} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go new file mode 100644 index 00000000..8c39b783 --- /dev/null +++ b/plugins/server/plugin.go @@ -0,0 +1,257 @@ +package server + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + + // core imports + "github.com/spiral/roadrunner/v2/interfaces/events" + "github.com/spiral/roadrunner/v2/interfaces/pool" + "github.com/spiral/roadrunner/v2/interfaces/worker" + "github.com/spiral/roadrunner/v2/pkg/pipe" + poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" + "github.com/spiral/roadrunner/v2/pkg/socket" + "github.com/spiral/roadrunner/v2/utils" +) + +// PluginName for the server +const PluginName = "server" + +// RR_RELAY env variable key (internal) +const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck +// RR_RPC env variable key (internal) if the RPC presents +const RR_RPC = "" //nolint:golint,stylecheck +// RR_HTTP env variable key (internal) if the HTTP presents +const RR_HTTP = "false" //nolint:golint,stylecheck + +// Plugin manages worker +type Plugin struct { + cfg Config + log logger.Logger + factory worker.Factory +} + +// Init application provider. +func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("server plugin init") + err := cfg.Unmarshal(&server.cfg) + if err != nil { + return errors.E(op, errors.Init, err) + } + server.cfg.InitDefaults() + server.log = log + + server.factory, err = server.initFactory() + if err != nil { + return errors.E(err) + } + + return nil +} + +// Name contains service name. +func (server *Plugin) Name() string { + return PluginName +} + +// Serve (Start) server plugin (just a mock here to satisfy interface) +func (server *Plugin) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +// Stop used to close chosen in config factory +func (server *Plugin) Stop() error { + if server.factory == nil { + return nil + } + + return server.factory.Close() +} + +// CmdFactory provides worker command factory associated with given context. +func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { + const op = errors.Op("cmd factory") + var cmdArgs []string + + // create command according to the config + cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...) + if len(cmdArgs) < 2 { + return nil, errors.E(op, errors.Str("should be in form of `php <script>")) + } + if cmdArgs[0] != "php" { + return nil, errors.E(op, errors.Str("first arg in command should be `php`")) + } + + _, err := os.Stat(cmdArgs[1]) + if err != nil { + return nil, errors.E(op, err) + } + return func() *exec.Cmd { + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec + utils.IsolateProcess(cmd) + + // if user is not empty, and OS is linux or macos + // execute php worker from that particular user + if server.cfg.Server.User != "" { + err := utils.ExecuteFromUser(cmd, server.cfg.Server.User) + if err != nil { + return nil + } + } + + cmd.Env = server.setEnv(env) + + return cmd + }, nil +} + +// NewWorker issues new standalone worker. +func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) { + const op = errors.Op("new worker") + + list := make([]events.Listener, 0, len(listeners)) + list = append(list, server.collectWorkerLogs) + + spawnCmd, err := server.CmdFactory(env) + if err != nil { + return nil, errors.E(op, err) + } + + w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...) + if err != nil { + return nil, errors.E(op, err) + } + + return w, nil +} + +// NewWorkerPool issues new worker pool. +func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { + const op = errors.Op("server plugins new worker pool") + spawnCmd, err := server.CmdFactory(env) + if err != nil { + return nil, errors.E(op, err) + } + + list := make([]events.Listener, 0, 1) + list = append(list, server.collectPoolLogs) + if len(listeners) != 0 { + list = append(list, listeners...) + } + + p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...)) + if err != nil { + return nil, errors.E(op, err) + } + + return p, nil +} + +// creates relay and worker factory. +func (server *Plugin) initFactory() (worker.Factory, error) { + const op = errors.Op("server factory init") + if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { + return pipe.NewPipeFactory(), nil + } + + dsn := strings.Split(server.cfg.Server.Relay, "://") + if len(dsn) != 2 { + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } + + lsn, err := utils.CreateListener(server.cfg.Server.Relay) + if err != nil { + return nil, errors.E(op, errors.Network, err) + } + + switch dsn[0] { + // sockets group + case "unix": + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil + case "tcp": + return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil + default: + return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } +} + +func (server *Plugin) setEnv(e Env) []string { + env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay)) + for k, v := range e { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + // set internal env variables + if server.cfg.HTTP != nil { + env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true")) + } + if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" { + env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen)) + } + + // set env variables from the config + if len(server.cfg.Server.Env) > 0 { + for k, v := range server.cfg.Server.Env { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + } + + return env +} + +func (server *Plugin) collectPoolLogs(event interface{}) { + if we, ok := event.(events.PoolEvent); ok { + switch we.Event { + case events.EventMaxMemory: + server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) + case events.EventNoFreeWorkers: + server.log.Info("no free workers in pool", "error", we.Payload.(error).Error()) + case events.EventPoolError: + server.log.Info("pool error", "error", we.Payload.(error).Error()) + case events.EventSupervisorError: + server.log.Info("pool supervisor error", "error", we.Payload.(error).Error()) + case events.EventTTL: + server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid()) + case events.EventWorkerConstruct: + if _, ok := we.Payload.(error); ok { + server.log.Error("worker construction error", "error", we.Payload.(error).Error()) + return + } + server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid()) + case events.EventWorkerDestruct: + server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid()) + case events.EventExecTTL: + server.log.Info("EVENT EXEC TTL PLACEHOLDER") + case events.EventIdleTTL: + server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid()) + } + } + + if we, ok := event.(events.WorkerEvent); ok { + switch we.Event { + case events.EventWorkerError: + server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + case events.EventWorkerLog: + server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) + } + } +} + +func (server *Plugin) collectWorkerLogs(event interface{}) { + if we, ok := event.(events.WorkerEvent); ok { + switch we.Event { + case events.EventWorkerError: + server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) + case events.EventWorkerLog: + server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) + } + } +} |