summaryrefslogtreecommitdiff
path: root/plugins/server
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
committerValery Piashchynski <[email protected]>2021-09-16 17:12:37 +0300
commitf3491c089b4da77fd8d2bc942a88b6b8d117a8a5 (patch)
tree32bfffb1f24eeee7b909747cc00a6a6b9fd3ee83 /plugins/server
parent5d2cd55ab522d4f1e65a833f91146444465a32ac (diff)
Move plugins to a separate repository
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/server')
-rw-r--r--plugins/server/command.go33
-rw-r--r--plugins/server/command_test.go43
-rw-r--r--plugins/server/config.go60
-rw-r--r--plugins/server/interface.go23
-rw-r--r--plugins/server/plugin.go268
5 files changed, 0 insertions, 427 deletions
diff --git a/plugins/server/command.go b/plugins/server/command.go
deleted file mode 100644
index b8bc1395..00000000
--- a/plugins/server/command.go
+++ /dev/null
@@ -1,33 +0,0 @@
-package server
-
-import (
- "os"
- "regexp"
-
- "github.com/spiral/errors"
-)
-
-// pattern for the path finding
-const pattern string = `^\/*([A-z/.:-]+\.(php|sh|ph))$`
-
-func (server *Plugin) scanCommand(cmd []string) error {
- const op = errors.Op("server_command_scan")
- r, err := regexp.Compile(pattern)
- if err != nil {
- return err
- }
-
- for i := 0; i < len(cmd); i++ {
- if r.MatchString(cmd[i]) {
- // try to stat
- _, err := os.Stat(cmd[i])
- if err != nil {
- return errors.E(op, errors.FileNotFound, err)
- }
-
- // stat successful
- return nil
- }
- }
- return errors.E(errors.Str("scan failed, possible path not found, this is not an error"), op)
-}
diff --git a/plugins/server/command_test.go b/plugins/server/command_test.go
deleted file mode 100644
index 74762ccd..00000000
--- a/plugins/server/command_test.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package server
-
-import (
- "strings"
- "testing"
-
- "github.com/spiral/errors"
- "github.com/stretchr/testify/assert"
-)
-
-func TestServerCommandChecker(t *testing.T) {
- s := &Plugin{}
- cmd1 := "php ../../tests/client.php"
- assert.NoError(t, s.scanCommand(strings.Split(cmd1, " ")))
-
- cmd2 := "C:/../../abcdef/client.php"
- assert.Error(t, s.scanCommand(strings.Split(cmd2, " ")))
-
- cmd3 := "sh ./script.sh"
- err := s.scanCommand(strings.Split(cmd3, " "))
- assert.Error(t, err)
- if !errors.Is(errors.FileNotFound, err) {
- t.Fatal("should be of filenotfound type")
- }
-
- cmd4 := "php ../../tests/client.php --option1 --option2"
- err = s.scanCommand(strings.Split(cmd4, " "))
- assert.NoError(t, err)
-
- cmd5 := "php ../../tests/cluent.php --option1 --option2"
- err = s.scanCommand(strings.Split(cmd5, " "))
- assert.Error(t, err)
- if !errors.Is(errors.FileNotFound, err) {
- t.Fatal("should be of filenotfound type")
- }
-
- cmd6 := "php 0/../../tests/cluent.php --option1 --option2"
- err = s.scanCommand(strings.Split(cmd6, " "))
- assert.Error(t, err)
- if errors.Is(errors.FileNotFound, err) {
- t.Fatal("should be of filenotfound type")
- }
-}
diff --git a/plugins/server/config.go b/plugins/server/config.go
deleted file mode 100644
index 00ce4140..00000000
--- a/plugins/server/config.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package server
-
-import (
- "time"
-)
-
-// Config All config (.rr.yaml)
-// For other section use pointer to distinguish between `empty` and `not present`
-type Config struct {
- // Server config section
- Server struct {
- // Command to run as application.
- Command string `mapstructure:"command"`
- // User to run application under.
- User string `mapstructure:"user"`
- // Group to run application under.
- Group string `mapstructure:"group"`
- // Env represents application environment.
- Env Env `mapstructure:"env"`
- // Relay defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string `mapstructure:"relay"`
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration. Defaults to 60s.
- RelayTimeout time.Duration `mapstructure:"relay_timeout"`
- } `mapstructure:"server"`
-
- // we just need to know if the section exist, we don't need to read config from it
- RPC *struct {
- Listen string `mapstructure:"listen"`
- } `mapstructure:"rpc"`
- Logs *struct {
- } `mapstructure:"logs"`
- HTTP *struct {
- } `mapstructure:"http"`
- Redis *struct {
- } `mapstructure:"redis"`
- Boltdb *struct {
- } `mapstructure:"boltdb"`
- Memcached *struct {
- } `mapstructure:"memcached"`
- Memory *struct {
- } `mapstructure:"memory"`
- Metrics *struct {
- } `mapstructure:"metrics"`
- Reload *struct {
- } `mapstructure:"reload"`
-}
-
-// InitDefaults for the server config
-func (cfg *Config) InitDefaults() {
- if cfg.Server.Relay == "" {
- cfg.Server.Relay = "pipes"
- }
-
- if cfg.Server.RelayTimeout == 0 {
- cfg.Server.RelayTimeout = time.Second * 60
- }
-}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
deleted file mode 100644
index b0f84a7f..00000000
--- a/plugins/server/interface.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package server
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/worker"
-)
-
-// Env variables type alias
-type Env map[string]string
-
-// Server creates workers for the application.
-type Server interface {
- // CmdFactory return a new command based on the .rr.yaml server.command section
- CmdFactory(env Env) (func() *exec.Cmd, error)
- // NewWorker return a new worker with provided and attached by the user listeners and environment variables
- NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
- // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
- NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
-}
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
deleted file mode 100644
index 5f5f2df9..00000000
--- a/plugins/server/plugin.go
+++ /dev/null
@@ -1,268 +0,0 @@
-package server
-
-import (
- "context"
- "fmt"
- "os"
- "os/exec"
- "strings"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/transport"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-
- // core imports
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/transport/pipe"
- "github.com/spiral/roadrunner/v2/pkg/transport/socket"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-const (
- // PluginName for the server
- PluginName = "server"
- // RrRelay env variable key (internal)
- RrRelay = "RR_RELAY"
- // RrRPC env variable key (internal) if the RPC presents
- RrRPC = "RR_RPC"
-)
-
-// Plugin manages worker
-type Plugin struct {
- cfg Config
- log logger.Logger
- factory transport.Factory
-}
-
-// Init application provider.
-func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("server_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
- err := cfg.Unmarshal(&server.cfg)
- if err != nil {
- return errors.E(op, errors.Init, err)
- }
- server.cfg.InitDefaults()
- server.log = log
-
- return nil
-}
-
-// Name contains service name.
-func (server *Plugin) Name() string {
- return PluginName
-}
-
-// Available interface implementation
-func (server *Plugin) Available() {}
-
-// Serve (Start) server plugin (just a mock here to satisfy interface)
-func (server *Plugin) Serve() chan error {
- const op = errors.Op("server_plugin_serve")
- errCh := make(chan error, 1)
- var err error
- server.factory, err = server.initFactory()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
- return errCh
-}
-
-// Stop used to close chosen in config factory
-func (server *Plugin) Stop() error {
- if server.factory == nil {
- return nil
- }
-
- return server.factory.Close()
-}
-
-// CmdFactory provides worker command factory associated with given context.
-func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
- const op = errors.Op("server_plugin_cmd_factory")
- var cmdArgs []string
-
- // create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(server.cfg.Server.Command, " ")...)
- if len(cmdArgs) < 2 {
- return nil, errors.E(op, errors.Str("minimum command should be `<executable> <script>"))
- }
-
- // try to find a path here
- err := server.scanCommand(cmdArgs)
- if err != nil {
- server.log.Info("scan command", "reason", err)
- }
-
- return func() *exec.Cmd {
- cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
- utils.IsolateProcess(cmd)
-
- // if user is not empty, and OS is linux or macos
- // execute php worker from that particular user
- if server.cfg.Server.User != "" {
- err := utils.ExecuteFromUser(cmd, server.cfg.Server.User)
- if err != nil {
- return nil
- }
- }
-
- cmd.Env = server.setEnv(env)
-
- return cmd
- }, nil
-}
-
-// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) {
- const op = errors.Op("server_plugin_new_worker")
-
- list := make([]events.Listener, 0, len(listeners))
- list = append(list, server.collectWorkerEvents)
-
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return w, nil
-}
-
-// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt *pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) {
- const op = errors.Op("server_plugin_new_worker_pool")
-
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- list := make([]events.Listener, 0, 2)
- list = append(list, server.collectPoolEvents, server.collectWorkerEvents)
- if len(listeners) != 0 {
- list = append(list, listeners...)
- }
-
- p, err := pool.Initialize(ctx, spawnCmd, server.factory, opt, pool.AddListeners(list...))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return p, nil
-}
-
-// creates relay and worker factory.
-func (server *Plugin) initFactory() (transport.Factory, error) {
- const op = errors.Op("server_plugin_init_factory")
- if server.cfg.Server.Relay == "" || server.cfg.Server.Relay == "pipes" {
- return pipe.NewPipeFactory(), nil
- }
-
- dsn := strings.Split(server.cfg.Server.Relay, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-
- lsn, err := utils.CreateListener(server.cfg.Server.Relay)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
-
- switch dsn[0] {
- // sockets group
- case "unix":
- return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
- case "tcp":
- return socket.NewSocketServer(lsn, server.cfg.Server.RelayTimeout), nil
- default:
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-}
-
-func (server *Plugin) setEnv(e Env) []string {
- env := append(os.Environ(), fmt.Sprintf(RrRelay+"=%s", server.cfg.Server.Relay))
- for k, v := range e {
- env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
- }
-
- if server.cfg.RPC != nil && server.cfg.RPC.Listen != "" {
- env = append(env, fmt.Sprintf("%s=%s", RrRPC, server.cfg.RPC.Listen))
- }
-
- // set env variables from the config
- if len(server.cfg.Server.Env) > 0 {
- for k, v := range server.cfg.Server.Env {
- env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
- }
- }
-
- return env
-}
-
-func (server *Plugin) collectPoolEvents(event interface{}) {
- if we, ok := event.(events.PoolEvent); ok {
- switch we.Event {
- case events.EventMaxMemory:
- server.log.Warn("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventNoFreeWorkers:
- server.log.Warn("no free workers in the pool", "error", we.Payload.(error).Error())
- case events.EventPoolError:
- server.log.Error("pool error", "error", we.Payload.(error).Error())
- case events.EventSupervisorError:
- server.log.Error("pool supervisor error", "error", we.Payload.(error).Error())
- case events.EventTTL:
- server.log.Warn("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerConstruct:
- if _, ok := we.Payload.(error); ok {
- server.log.Error("worker construction error", "error", we.Payload.(error).Error())
- return
- }
- server.log.Debug("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerDestruct:
- server.log.Debug("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventExecTTL:
- server.log.Warn("worker exec timeout reached", "error", we.Payload.(error).Error())
- case events.EventIdleTTL:
- server.log.Warn("worker idle timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventPoolRestart:
- server.log.Warn("requested pool restart")
- }
- }
-}
-
-func (server *Plugin) collectWorkerEvents(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- switch e := we.Payload.(type) { //nolint:gocritic
- case error:
- if errors.Is(errors.SoftJob, e) {
- // get source error for the softjob error
- server.log.Error(strings.TrimRight(e.(*errors.Error).Err.Error(), " \n\t"))
- return
- }
-
- // print full error for the other types of errors
- server.log.Error(strings.TrimRight(e.Error(), " \n\t"))
- return
- }
- server.log.Error(strings.TrimRight(we.Payload.(error).Error(), " \n\t"))
- case events.EventWorkerLog:
- server.log.Debug(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- // stderr event is INFO level
- case events.EventWorkerStderr:
- server.log.Info(strings.TrimRight(utils.AsString(we.Payload.([]byte)), " \n\t"))
- }
- }
-}