summaryrefslogtreecommitdiff
path: root/plugins/server
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/server')
-rw-r--r--plugins/server/config.go147
-rw-r--r--plugins/server/interface.go21
-rw-r--r--plugins/server/plugin.go257
3 files changed, 425 insertions, 0 deletions
diff --git a/plugins/server/config.go b/plugins/server/config.go
new file mode 100644
index 00000000..93b19226
--- /dev/null
+++ b/plugins/server/config.go
@@ -0,0 +1,147 @@
+package server
+
+import (
+ "time"
+)
+
+// All config (.rr.yaml)
+// For other section use pointer to distinguish between `empty` and `not present`
+type Config struct {
+ // Server config section
+ Server struct {
+ // Command to run as application.
+ Command string `mapstructure:"command"`
+ // User to run application under.
+ User string `mapstructure:"user"`
+ // Group to run application under.
+ Group string `mapstructure:"group"`
+ // Env represents application environment.
+ Env Env `mapstructure:"env"`
+ // Relay defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string `mapstructure:"relay"`
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration `mapstructure:"relayTimeout"`
+ } `mapstructure:"server"`
+
+ RPC *struct {
+ Listen string `mapstructure:"listen"`
+ } `mapstructure:"rpc"`
+ Logs *struct {
+ Mode string `mapstructure:"mode"`
+ Level string `mapstructure:"level"`
+ } `mapstructure:"logs"`
+ HTTP *struct {
+ Address string `mapstructure:"address"`
+ MaxRequestSize int `mapstructure:"max_request_size"`
+ Middleware []string `mapstructure:"middleware"`
+ Uploads struct {
+ Forbid []string `mapstructure:"forbid"`
+ } `mapstructure:"uploads"`
+ TrustedSubnets []string `mapstructure:"trusted_subnets"`
+ Pool struct {
+ NumWorkers int `mapstructure:"num_workers"`
+ MaxJobs int `mapstructure:"max_jobs"`
+ AllocateTimeout string `mapstructure:"allocate_timeout"`
+ DestroyTimeout string `mapstructure:"destroy_timeout"`
+ Supervisor struct {
+ WatchTick int `mapstructure:"watch_tick"`
+ TTL int `mapstructure:"ttl"`
+ IdleTTL int `mapstructure:"idle_ttl"`
+ ExecTTL int `mapstructure:"exec_ttl"`
+ MaxWorkerMemory int `mapstructure:"max_worker_memory"`
+ } `mapstructure:"supervisor"`
+ } `mapstructure:"pool"`
+ Ssl struct {
+ Port int `mapstructure:"port"`
+ Redirect bool `mapstructure:"redirect"`
+ Cert string `mapstructure:"cert"`
+ Key string `mapstructure:"key"`
+ } `mapstructure:"ssl"`
+ Fcgi struct {
+ Address string `mapstructure:"address"`
+ } `mapstructure:"fcgi"`
+ HTTP2 struct {
+ Enabled bool `mapstructure:"enabled"`
+ H2C bool `mapstructure:"h2c"`
+ MaxConcurrentStreams int `mapstructure:"max_concurrent_streams"`
+ } `mapstructure:"http2"`
+ } `mapstructure:"http"`
+ Redis *struct {
+ Addrs []string `mapstructure:"addrs"`
+ MasterName string `mapstructure:"master_name"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ DB int `mapstructure:"db"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ DialTimeout int `mapstructure:"dial_timeout"`
+ MaxRetries int `mapstructure:"max_retries"`
+ MinRetryBackoff int `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff int `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge int `mapstructure:"max_conn_age"`
+ ReadTimeout int `mapstructure:"read_timeout"`
+ WriteTimeout int `mapstructure:"write_timeout"`
+ PoolTimeout int `mapstructure:"pool_timeout"`
+ IdleTimeout int `mapstructure:"idle_timeout"`
+ IdleCheckFreq int `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+ } `mapstructure:"redis"`
+ Boltdb *struct {
+ Dir string `mapstructure:"dir"`
+ File string `mapstructure:"file"`
+ Bucket string `mapstructure:"bucket"`
+ Permissions int `mapstructure:"permissions"`
+ TTL int `mapstructure:"TTL"`
+ } `mapstructure:"boltdb"`
+ Memcached *struct {
+ Addr []string `mapstructure:"addr"`
+ } `mapstructure:"memcached"`
+ Memory *struct {
+ Enabled bool `mapstructure:"enabled"`
+ Interval int `mapstructure:"interval"`
+ } `mapstructure:"memory"`
+ Metrics *struct {
+ Address string `mapstructure:"address"`
+ Collect struct {
+ AppMetric struct {
+ Type string `mapstructure:"type"`
+ Help string `mapstructure:"help"`
+ Labels []string `mapstructure:"labels"`
+ Buckets []float64 `mapstructure:"buckets"`
+ Objectives []struct {
+ Num2 float64 `mapstructure:"2,omitempty"`
+ One4 float64 `mapstructure:"1.4,omitempty"`
+ } `mapstructure:"objectives"`
+ } `mapstructure:"app_metric"`
+ } `mapstructure:"collect"`
+ } `mapstructure:"metrics"`
+ Reload *struct {
+ Interval string `mapstructure:"interval"`
+ Patterns []string `mapstructure:"patterns"`
+ Services struct {
+ HTTP struct {
+ Recursive bool `mapstructure:"recursive"`
+ Ignore []string `mapstructure:"ignore"`
+ Patterns []string `mapstructure:"patterns"`
+ Dirs []string `mapstructure:"dirs"`
+ } `mapstructure:"http"`
+ } `mapstructure:"services"`
+ } `mapstructure:"reload"`
+}
+
+// InitDefaults for the server config
+func (cfg *Config) InitDefaults() {
+ if cfg.Server.Relay == "" {
+ cfg.Server.Relay = "pipes"
+ }
+
+ if cfg.Server.RelayTimeout == 0 {
+ cfg.Server.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
new file mode 100644
index 00000000..a2d8b92b
--- /dev/null
+++ b/plugins/server/interface.go
@@ -0,0 +1,21 @@
+package server
+
+import (
+ "context"
+ "os/exec"
+
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+)
+
+// Env variables type alias
+type Env map[string]string
+
+// Server creates workers for the application.
+type Server interface {
+ CmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error)
+ NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
+}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
new file mode 100644
index 00000000..8c39b783
--- /dev/null
+++ b/plugins/server/plugin.go
@@ -0,0 +1,257 @@
+package server
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+
+ // core imports
+ "github.com/spiral/roadrunner/v2/interfaces/events"
+ "github.com/spiral/roadrunner/v2/interfaces/pool"
+ "github.com/spiral/roadrunner/v2/interfaces/worker"
+ "github.com/spiral/roadrunner/v2/pkg/pipe"
+ poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
+ "github.com/spiral/roadrunner/v2/pkg/socket"
+ "github.com/spiral/roadrunner/v2/utils"
+)
+
+// PluginName for the server
+const PluginName = "server"
+
+// RR_RELAY env variable key (internal)
+const RR_RELAY = "RR_RELAY" //nolint:golint,stylecheck
+// RR_RPC env variable key (internal) if the RPC presents
+const RR_RPC = "" //nolint:golint,stylecheck
+// RR_HTTP env variable key (internal) if the HTTP presents
+const RR_HTTP = "false" //nolint:golint,stylecheck
+
+// Plugin manages worker
+type Plugin struct {
+ cfg Config
+ log logger.Logger
+ factory worker.Factory
+}
+
+// Init application provider.
+func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("server plugin init")
+ err := cfg.Unmarshal(&server.cfg)
+ if err != nil {
+ return errors.E(op, errors.Init, err)
+ }
+ server.cfg.InitDefaults()
+ server.log = log
+
+ server.factory, err = server.initFactory()
+ if err != nil {
+ return errors.E(err)
+ }
+
+ return nil
+}
+
+// Name contains service name.
+func (server *Plugin) Name() string {
+ return PluginName
+}
+
+// Serve (Start) server plugin (just a mock here to satisfy interface)
+func (server *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+// Stop used to close chosen in config factory
+func (server *Plugin) Stop() error {
+ if server.factory == nil {
+ return nil
+ }
+
+ return server.factory.Close()
+}
+
+// CmdFactory provides worker command factory associated with given context.
+func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
+ const op = errors.Op("cmd factory")
+ var cmdArgs []string
+
+ // create command according to the config
+ cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...)
+ if len(cmdArgs) < 2 {
+ return nil, errors.E(op, errors.Str("should be in form of `php <script>"))
+ }
+ if cmdArgs[0] != "php" {
+ return nil, errors.E(op, errors.Str("first arg in command should be `php`"))
+ }
+
+ _, err := os.Stat(cmdArgs[1])
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ return func() *exec.Cmd {
+ cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
+ utils.IsolateProcess(cmd)
+
+ // if user is not empty, and OS is linux or macos
+ // execute php worker from that particular user
+ if server.cfg.Server.User != "" {
+ err := utils.ExecuteFromUser(cmd, server.cfg.Server.User)
+ if err != nil {
+ return nil
+ }
+ }
+
+ cmd.Env = server.setEnv(env)
+
+ return cmd
+ }, nil
+}
+
+// NewWorker issues new standalone worker.
+func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (worker.BaseProcess, error) {
+ const op = errors.Op("new worker")
+
+ list := make([]events.Listener, 0, len(listeners))
+ list = append(list, server.collectWorkerLogs)
+
+ spawnCmd, err := server.CmdFactory(env)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return w, nil
+}
+
+// NewWorkerPool issues new worker pool.
+func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
+ const op = errors.Op("server plugins new worker pool")
+ spawnCmd, err := server.CmdFactory(env)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ list := make([]events.Listener, 0, 1)
+ list = append(list, server.collectPoolLogs)
+ if len(listeners) != 0 {
+ list = append(list, listeners...)
+ }
+
+ p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...))
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ return p, nil
+}
+
+// creates relay and worker factory.
+func (server *Plugin) initFactory() (worker.Factory, error) {
+ const op = errors.Op("server factory init")
+ if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" {
+ return pipe.NewPipeFactory(), nil
+ }
+
+ dsn := strings.Split(server.cfg.Server.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+
+ lsn, err := utils.CreateListener(server.cfg.Server.Relay)
+ if err != nil {
+ return nil, errors.E(op, errors.Network, err)
+ }
+
+ switch dsn[0] {
+ // sockets group
+ case "unix":
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
+ case "tcp":
+ return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
+ default:
+ return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+}
+
+func (server *Plugin) setEnv(e Env) []string {
+ env := append(os.Environ(), fmt.Sprintf(RR_RELAY+"=%s", server.cfg.Server.Relay))
+ for k, v := range e {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+
+ // set internal env variables
+ if server.cfg.HTTP != nil {
+ env = append(env, fmt.Sprintf("%s=%s", RR_HTTP, "true"))
+ }
+ if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" {
+ env = append(env, fmt.Sprintf("%s=%s", RR_RPC, server.cfg.RPC.Listen))
+ }
+
+ // set env variables from the config
+ if len(server.cfg.Server.Env) > 0 {
+ for k, v := range server.cfg.Server.Env {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+ }
+
+ return env
+}
+
+func (server *Plugin) collectPoolLogs(event interface{}) {
+ if we, ok := event.(events.PoolEvent); ok {
+ switch we.Event {
+ case events.EventMaxMemory:
+ server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
+ case events.EventNoFreeWorkers:
+ server.log.Info("no free workers in pool", "error", we.Payload.(error).Error())
+ case events.EventPoolError:
+ server.log.Info("pool error", "error", we.Payload.(error).Error())
+ case events.EventSupervisorError:
+ server.log.Info("pool supervisor error", "error", we.Payload.(error).Error())
+ case events.EventTTL:
+ server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid())
+ case events.EventWorkerConstruct:
+ if _, ok := we.Payload.(error); ok {
+ server.log.Error("worker construction error", "error", we.Payload.(error).Error())
+ return
+ }
+ server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid())
+ case events.EventWorkerDestruct:
+ server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid())
+ case events.EventExecTTL:
+ server.log.Info("EVENT EXEC TTL PLACEHOLDER")
+ case events.EventIdleTTL:
+ server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid())
+ }
+ }
+
+ if we, ok := event.(events.WorkerEvent); ok {
+ switch we.Event {
+ case events.EventWorkerError:
+ server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ case events.EventWorkerLog:
+ server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
+ }
+ }
+}
+
+func (server *Plugin) collectWorkerLogs(event interface{}) {
+ if we, ok := event.(events.WorkerEvent); ok {
+ switch we.Event {
+ case events.EventWorkerError:
+ server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
+ case events.EventWorkerLog:
+ server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
+ }
+ }
+}