diff options
author | Valery Piashchynski <[email protected]> | 2020-12-25 14:46:01 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-25 14:46:01 +0300 |
commit | 8526c03822e724bc2ebb64b6197085fea335b782 (patch) | |
tree | b205b392b3721606fae4fa3174327259b41bc76a /plugins/server/plugin.go | |
parent | 42b33b77793789d666451798b07587f6404242b4 (diff) |
Move root plugins to the pkg
Diffstat (limited to 'plugins/server/plugin.go')
-rw-r--r-- | plugins/server/plugin.go | 229 |
1 files changed, 0 insertions, 229 deletions
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go deleted file mode 100644 index 3d90c95b..00000000 --- a/plugins/server/plugin.go +++ /dev/null @@ -1,229 +0,0 @@ -package server - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner-plugins/config" - "github.com/spiral/roadrunner-plugins/logger" - - // core imports - "github.com/spiral/roadrunner/v2/interfaces/events" - "github.com/spiral/roadrunner/v2/interfaces/pool" - "github.com/spiral/roadrunner/v2/interfaces/worker" - "github.com/spiral/roadrunner/v2/pkg/pipe" - poolImpl "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/socket" - "github.com/spiral/roadrunner/v2/util" -) - -const PluginName = "server" - -// Plugin manages worker -type Plugin struct { - cfg Config - log logger.Logger - factory worker.Factory -} - -// Init application provider. -func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("Init") - err := cfg.UnmarshalKey(PluginName, &server.cfg) - if err != nil { - return errors.E(op, errors.Init, err) - } - server.cfg.InitDefaults() - server.log = log - - server.factory, err = server.initFactory() - if err != nil { - return errors.E(errors.Op("Init factory"), err) - } - - return nil -} - -// Name contains service name. -func (server *Plugin) Name() string { - return PluginName -} - -func (server *Plugin) Serve() chan error { - errCh := make(chan error, 1) - return errCh -} - -func (server *Plugin) Stop() error { - if server.factory == nil { - return nil - } - - return server.factory.Close() -} - -// CmdFactory provides worker command factory associated with given context. -func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { - const op = errors.Op("cmd factory") - var cmdArgs []string - - // create command according to the config - cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...) - if len(cmdArgs) < 2 { - return nil, errors.E(op, errors.Str("should be in form of `php <script>")) - } - if cmdArgs[0] != "php" { - return nil, errors.E(op, errors.Str("first arg in command should be `php`")) - } - - _, err := os.Stat(cmdArgs[1]) - if err != nil { - return nil, errors.E(op, err) - } - return func() *exec.Cmd { - cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec - util.IsolateProcess(cmd) - - // if user is not empty, and OS is linux or macos - // execute php worker from that particular user - if server.cfg.User != "" { - err := util.ExecuteFromUser(cmd, server.cfg.User) - if err != nil { - return nil - } - } - - cmd.Env = server.setEnv(env) - - return cmd - }, nil -} - -// NewWorker issues new standalone worker. -func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) { - const op = errors.Op("new worker") - - list := make([]events.EventListener, 0, len(listeners)) - list = append(list, server.collectWorkerLogs) - - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...) - if err != nil { - return nil, errors.E(op, err) - } - - return w, nil -} - -// NewWorkerPool issues new worker pool. -func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) { - const op = errors.Op("server plugins new worker pool") - spawnCmd, err := server.CmdFactory(env) - if err != nil { - return nil, errors.E(op, err) - } - - list := make([]events.EventListener, 0, len(listeners)) - list = append(list, server.collectPoolLogs) - - p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...)) - if err != nil { - return nil, errors.E(op, err) - } - - return p, nil -} - -// creates relay and worker factory. -func (server *Plugin) initFactory() (worker.Factory, error) { - const op = errors.Op("network factory init") - if server.cfg.Relay == "" || server.cfg.Relay == "pipes" { - return pipe.NewPipeFactory(), nil - } - - dsn := strings.Split(server.cfg.Relay, "://") - if len(dsn) != 2 { - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } - - lsn, err := util.CreateListener(server.cfg.Relay) - if err != nil { - return nil, errors.E(op, errors.Network, err) - } - - switch dsn[0] { - // sockets group - case "unix": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil - case "tcp": - return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil - default: - return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) - } -} - -func (server *Plugin) setEnv(e Env) []string { - env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay)) - for k, v := range e { - env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) - } - - return env -} - -func (server *Plugin) collectPoolLogs(event interface{}) { - if we, ok := event.(events.PoolEvent); ok { - switch we.Event { - case events.EventMaxMemory: - server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventNoFreeWorkers: - server.log.Info("no free workers in pool", "error", we.Payload.(error).Error()) - case events.EventPoolError: - server.log.Info("pool error", "error", we.Payload.(error).Error()) - case events.EventSupervisorError: - server.log.Info("pool supervizor error", "error", we.Payload.(error).Error()) - case events.EventTTL: - server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerConstruct: - if _, ok := we.Payload.(error); ok { - server.log.Error("worker construction error", "error", we.Payload.(error).Error()) - return - } - server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventWorkerDestruct: - server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid()) - case events.EventExecTTL: - server.log.Info("EVENT EXEC TTL PLACEHOLDER") - case events.EventIdleTTL: - server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid()) - } - } - - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) - case events.EventWorkerLog: - server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) - } - } -} - -func (server *Plugin) collectWorkerLogs(event interface{}) { - if we, ok := event.(events.WorkerEvent); ok { - switch we.Event { - case events.EventWorkerError: - server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid()) - case events.EventWorkerLog: - server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid()) - } - } -} |