diff options
Diffstat (limited to 'plugins/server')
-rw-r--r-- | plugins/server/command.go | 33 | ||||
-rw-r--r-- | plugins/server/command_test.go | 43 | ||||
-rw-r--r-- | plugins/server/config.go | 60 | ||||
-rw-r--r-- | plugins/server/interface.go | 23 | ||||
-rw-r--r-- | plugins/server/plugin.go | 268 |
5 files changed, 0 insertions, 427 deletions
diff --git a/plugins/server/command.go b/plugins/server/command.go deleted file mode 100644 index b8bc1395..00000000 --- a/plugins/server/command.go +++ /dev/null @@ -1,33 +0,0 @@ -package server - -import ( - "os" - "regexp" - - "github.com/spiral/errors" -) - -// pattern for the path finding -const pattern string = `^\/*([A-z/.:-]+\.(php|sh|ph))$` - -func (server *Plugin) scanCommand(cmd []string) error { - const op = errors.Op("server_command_scan") - r, err := regexp.Compile(pattern) - if err != nil { - return err - } - - for i := 0; i < len(cmd); i++ { - if r.MatchString(cmd[i]) { - // try to stat - _, err := os.Stat(cmd[i]) - if err != nil { - return errors.E(op, errors.FileNotFound, err) - } - - // stat successful - return nil - } - } - return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op) -} diff --git a/plugins/server/command_test.go b/plugins/server/command_test.go deleted file mode 100644 index 74762ccd..00000000 --- a/plugins/server/command_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package server - -import ( - "strings" - "testing" - - "github.com/spiral/errors" - "github.com/stretchr/testify/assert" -) - -func TestServerCommandChecker(t *testing.T) { - s := &Plugin{} - cmd1 := "php ../../tests/client.php" - assert.NoError(t, s.scanCommand(strings.Split(cmd1, " "))) - - cmd2 := "C:/../../abcdef/client.php" - assert.Error(t, s.scanCommand(strings.Split(cmd2, " "))) - - cmd3 := "sh ./script.sh" - err := s.scanCommand(strings.Split(cmd3, " ")) - assert.Error(t, err) - if !errors.Is(errors.FileNotFound, err) { - t.Fatal("should be of filenotfound type") - } - - cmd4 := "php ../../tests/client.php --option1 --option2" - err = s.scanCommand(strings.Split(cmd4, " ")) - assert.NoError(t, err) - - cmd5 := "php ../../tests/cluent.php --option1 --option2" - err = s.scanCommand(strings.Split(cmd5, " ")) - assert.Error(t, err) - if !errors.Is(errors.FileNotFound, err) { - t.Fatal("should be of filenotfound type") - } - - cmd6 := "php 0/../../tests/cluent.php --option1 --option2" - err = s.scanCommand(strings.Split(cmd6, " ")) - assert.Error(t, err) - if errors.Is(errors.FileNotFound, err) { - t.Fatal("should be of filenotfound type") - } -} diff --git a/plugins/server/config.go b/plugins/server/config.go deleted file mode 100644 index 00ce4140..00000000 --- a/plugins/server/config.go +++ /dev/null @@ -1,60 +0,0 @@ -package server - -import ( - "time" -) - -// Config All config (.rr.yaml) -// For other section use pointer to distinguish between `empty` and `not present` -type Config struct { - // Server config section - Server struct { - // Command to run as application. - Command string `mapstructure:"command"` - // User to run application under. - User string `mapstructure:"user"` - // Group to run application under. - Group string `mapstructure:"group"` - // Env represents application environment. - Env Env `mapstructure:"env"` - // Relay defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string `mapstructure:"relay"` - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. Defaults to 60s. - RelayTimeout time.Duration `mapstructure:"relay_timeout"` - } `mapstructure:"server"` - - // we just need to know if the section exist, we don't need to read config from it - RPC *struct { - Listen string `mapstructure:"listen"` - } `mapstructure:"rpc"` - Logs *struct { - } `mapstructure:"logs"` - HTTP *struct { - } `mapstructure:"http"` - Redis *struct { - } `mapstructure:"redis"` - Boltdb *struct { - } `mapstructure:"boltdb"` - Memcached *struct { - } `mapstructure:"memcached"` - Memory *struct { - } `mapstructure:"memory"` - Metrics *struct { - } `mapstructure:"metrics"` - Reload *struct { - } `mapstructure:"reload"` -} - -// InitDefaults for the server config -func (cfg *Config) InitDefaults() { - if cfg.Server.Relay == "" { - cfg.Server.Relay = "pipes" - } - - if cfg.Server.RelayTimeout == 0 { - cfg.Server.RelayTimeout = time.Second * 60 - } -} diff --git a/plugins/server/interface.go b/plugins/server/interface.go deleted file mode 100644 index b0f84a7f..00000000 --- a/plugins/server/interface.go +++ /dev/null @@ -1,23 +0,0 @@ -package server - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/worker" -) - -// Env variables type alias -type Env map[string]string - -// Server creates workers for the application. -type Server interface { - // CmdFactory return a new command based on the .rr.yaml server.command section - CmdFactory(env Env) (func() *exec.Cmd, error) - // NewWorker return a new worker with provided and attached by the user listeners and environment variables - NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) - // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration - NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) -} diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go deleted file mode 100644 index 5f5f2df9..00000000 --- a/plugins/server/plugin.go +++ /dev/null @@ -1,268 +0,0 @@ -package server - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/transport" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - - // core imports - "github.com/spiral/roadrunner/v2/pkg/events" - "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/pkg/transport/socket" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/utils" -) - -const ( - // PluginName for the server - PluginName = "server" - // RrRelay env variable key (internal) - RrRelay = "RR_RELAY" - // RrRPC env variable key (internal) if the RPC presents - RrRPC = "RR_RPC" -) - -// Plugin manages worker -type Plugin struct { - cfg Config - log logger.Logger - factory transport.Factory -} - -// Init application provider. -func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("server_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - err := cfg.Unmarshal(&server.cfg) - if err != nil { - return errors.E(op, errors.Init, err) - } - server.cfg.InitDefaults() - server.log = log - - return nil -} - -// Name contains service name. -func (server *Plugin) Name() string { - return PluginName -} - -// Available interface implementation -func (server *Plugin) Available() {} - -// Serve (Start) server plugin (just a mock here to satisfy interface) -func (server *Plugin) Serve() chan error { - const op = errors.Op("server_plugin_serve") - errCh := make(chan error, 1) - var err error - server.factory, err = server.initFactory() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - return errCh -} - -// Stop used to close chosen in config factory -func (server *Plugin) Stop() error { - if server.factory == nil { - return nil - } - - return server.factory.Close() -} - -// CmdFactory provides worker command factory associated with given context. -func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { - const op = errors.Op("server_plugin_cmd_factory") - var cmdArgs []string - - // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...) - if len(cmdArgs) < 2 { - return nil, errors.E(op, errors.Str("minimum command should be `<executable> <script>")) - } - - // try to find a path here - err := server.scanCommand(cmdArgs) - if err != nil { - server.log.Info("scan command", "reason", err) - } - - return func() *exec.Cmd { - cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec - utils.IsolateProcess(cmd) - - // if user is not empty, and OS is linux or macos - // execute php worker from that particular user - if server.cfg.Server.User != "" { - err := utils.ExecuteFromUser(cmd, server.cfg.Server.User) - if err != nil { - return nil - } - } - - cmd.Env = server.setEnv(env) - - return cmd - }, nil -} - -// NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) { - const op = errors.Op("server_plugin_new_worker") - - list := make([]events.Listener, 0, len(listeners)) - list = append(list, server.collectWorkerEvents) - - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...) - if err != nil { - return nil, errors.E(op, err) - } - - return w, nil -} - -// NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) { - const op = errors.Op("server_plugin_new_worker_pool") - - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - list := make([]events.Listener, 0, 2) - list = append(list, server.collectPoolEvents, server.collectWorkerEvents) - if len(listeners) != 0 { - list = append(list, listeners...) - } - - p, err := pool.Initialize(ctx, spawnCmd, server.factory, opt, pool.AddListeners(list...)) - if err != nil { - return nil, errors.E(op, err) - } - - return p, nil -} - -// creates relay and worker factory. -func (server *Plugin) initFactory() (transport.Factory, error) { - const op = errors.Op("server_plugin_init_factory") - if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" { - return pipe.NewPipeFactory(), nil - } - - dsn := strings.Split(server.cfg.Server.Relay, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } - - lsn, err := utils.CreateListener(server.cfg.Server.Relay) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - switch dsn[0] { - // sockets group - case "unix": - return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil - case "tcp": - return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil - default: - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } -} - -func (server *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf(RrRelay+"=%s", server.cfg.Server.Relay)) - for k, v := range e { - env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) - } - - if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" { - env = append(env, fmt.Sprintf("%s=%s", RrRPC, server.cfg.RPC.Listen)) - } - - // set env variables from the config - if len(server.cfg.Server.Env) > 0 { - for k, v := range server.cfg.Server.Env { - env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) - } - } - - return env -} - -func (server *Plugin) collectPoolEvents(event interface{}) { - if we, ok := event.(events.PoolEvent); ok { - switch we.Event { - case events.EventMaxMemory: - server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventNoFreeWorkers: - server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error()) - case events.EventPoolError: - server.log.Error("pool error", "error", we.Payload.(error).Error()) - case events.EventSupervisorError: - server.log.Error("pool supervisor error", "error", we.Payload.(error).Error()) - case events.EventTTL: - server.log.Warn("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerConstruct: - if _, ok := we.Payload.(error); ok { - server.log.Error("worker construction error", "error", we.Payload.(error).Error()) - return - } - server.log.Debug("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerDestruct: - server.log.Debug("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventExecTTL: - server.log.Warn("worker exec timeout reached", "error", we.Payload.(error).Error()) - case events.EventIdleTTL: - server.log.Warn("worker idle timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventPoolRestart: - server.log.Warn("requested pool restart") - } - } -} - -func (server *Plugin) collectWorkerEvents(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - switch e := we.Payload.(type) { //nolint:gocritic - case error: - if errors.Is(errors.SoftJob, e) { - // get source error for the softjob error - server.log.Error(strings.TrimRight(e.(*errors.Error).Err.Error(), " \n\t")) - return - } - - // print full error for the other types of errors - server.log.Error(strings.TrimRight(e.Error(), " \n\t")) - return - } - server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t")) - case events.EventWorkerLog: - server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - // stderr event is INFO level - case events.EventWorkerStderr: - server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t")) - } - } -} |