summaryrefslogtreecommitdiff
path: root/plugins/server/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-25 14:46:01 +0300
committerValery Piashchynski <[email protected]>2020-12-25 14:46:01 +0300
commit8526c03822e724bc2ebb64b6197085fea335b782 (patch)
treeb205b392b3721606fae4fa3174327259b41bc76a /plugins/server/plugin.go
parent42b33b77793789d666451798b07587f6404242b4 (diff)
Move root plugins to the pkg
Diffstat (limited to 'plugins/server/plugin.go')
-rw-r--r--plugins/server/plugin.go229
1 files changed, 0 insertions, 229 deletions
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
deleted file mode 100644
index 3d90c95b..00000000
--- a/plugins/server/plugin.go
+++ /dev/null
@@ -1,229 +0,0 @@
-package server
-
-import (
- "context"
- "fmt"
- "os"
- "os/exec"
- "strings"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner-plugins/config"
- "github.com/spiral/roadrunner-plugins/logger"
-
- // core imports
- "github.com/spiral/roadrunner/v2/interfaces/events"
- "github.com/spiral/roadrunner/v2/interfaces/pool"
- "github.com/spiral/roadrunner/v2/interfaces/worker"
- "github.com/spiral/roadrunner/v2/pkg/pipe"
- poolImpl "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/socket"
- "github.com/spiral/roadrunner/v2/util"
-)
-
-const PluginName = "server"
-
-// Plugin manages worker
-type Plugin struct {
- cfg Config
- log logger.Logger
- factory worker.Factory
-}
-
-// Init application provider.
-func (server *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("Init")
- err := cfg.UnmarshalKey(PluginName, &server.cfg)
- if err != nil {
- return errors.E(op, errors.Init, err)
- }
- server.cfg.InitDefaults()
- server.log = log
-
- server.factory, err = server.initFactory()
- if err != nil {
- return errors.E(errors.Op("Init factory"), err)
- }
-
- return nil
-}
-
-// Name contains service name.
-func (server *Plugin) Name() string {
- return PluginName
-}
-
-func (server *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
- return errCh
-}
-
-func (server *Plugin) Stop() error {
- if server.factory == nil {
- return nil
- }
-
- return server.factory.Close()
-}
-
-// CmdFactory provides worker command factory associated with given context.
-func (server *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
- const op = errors.Op("cmd factory")
- var cmdArgs []string
-
- // create command according to the config
- cmdArgs = append(cmdArgs, strings.Split(server.cfg.Command, " ")...)
- if len(cmdArgs) < 2 {
- return nil, errors.E(op, errors.Str("should be in form of `php <script>"))
- }
- if cmdArgs[0] != "php" {
- return nil, errors.E(op, errors.Str("first arg in command should be `php`"))
- }
-
- _, err := os.Stat(cmdArgs[1])
- if err != nil {
- return nil, errors.E(op, err)
- }
- return func() *exec.Cmd {
- cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) //nolint:gosec
- util.IsolateProcess(cmd)
-
- // if user is not empty, and OS is linux or macos
- // execute php worker from that particular user
- if server.cfg.User != "" {
- err := util.ExecuteFromUser(cmd, server.cfg.User)
- if err != nil {
- return nil
- }
- }
-
- cmd.Env = server.setEnv(env)
-
- return cmd
- }, nil
-}
-
-// NewWorker issues new standalone worker.
-func (server *Plugin) NewWorker(ctx context.Context, env Env, listeners ...events.EventListener) (worker.BaseProcess, error) {
- const op = errors.Op("new worker")
-
- list := make([]events.EventListener, 0, len(listeners))
- list = append(list, server.collectWorkerLogs)
-
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd(), list...)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return w, nil
-}
-
-// NewWorkerPool issues new worker pool.
-func (server *Plugin) NewWorkerPool(ctx context.Context, opt poolImpl.Config, env Env, listeners ...events.EventListener) (pool.Pool, error) {
- const op = errors.Op("server plugins new worker pool")
- spawnCmd, err := server.CmdFactory(env)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- list := make([]events.EventListener, 0, len(listeners))
- list = append(list, server.collectPoolLogs)
-
- p, err := poolImpl.Initialize(ctx, spawnCmd, server.factory, opt, poolImpl.AddListeners(list...))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return p, nil
-}
-
-// creates relay and worker factory.
-func (server *Plugin) initFactory() (worker.Factory, error) {
- const op = errors.Op("network factory init")
- if server.cfg.Relay == "" || server.cfg.Relay == "pipes" {
- return pipe.NewPipeFactory(), nil
- }
-
- dsn := strings.Split(server.cfg.Relay, "://")
- if len(dsn) != 2 {
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-
- lsn, err := util.CreateListener(server.cfg.Relay)
- if err != nil {
- return nil, errors.E(op, errors.Network, err)
- }
-
- switch dsn[0] {
- // sockets group
- case "unix":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
- case "tcp":
- return socket.NewSocketServer(lsn, server.cfg.RelayTimeout), nil
- default:
- return nil, errors.E(op, errors.Network, errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
- }
-}
-
-func (server *Plugin) setEnv(e Env) []string {
- env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", server.cfg.Relay))
- for k, v := range e {
- env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
- }
-
- return env
-}
-
-func (server *Plugin) collectPoolLogs(event interface{}) {
- if we, ok := event.(events.PoolEvent); ok {
- switch we.Event {
- case events.EventMaxMemory:
- server.log.Info("worker max memory reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventNoFreeWorkers:
- server.log.Info("no free workers in pool", "error", we.Payload.(error).Error())
- case events.EventPoolError:
- server.log.Info("pool error", "error", we.Payload.(error).Error())
- case events.EventSupervisorError:
- server.log.Info("pool supervizor error", "error", we.Payload.(error).Error())
- case events.EventTTL:
- server.log.Info("worker TTL reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerConstruct:
- if _, ok := we.Payload.(error); ok {
- server.log.Error("worker construction error", "error", we.Payload.(error).Error())
- return
- }
- server.log.Info("worker constructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventWorkerDestruct:
- server.log.Info("worker destructed", "pid", we.Payload.(worker.BaseProcess).Pid())
- case events.EventExecTTL:
- server.log.Info("EVENT EXEC TTL PLACEHOLDER")
- case events.EventIdleTTL:
- server.log.Info("worker IDLE timeout reached", "pid", we.Payload.(worker.BaseProcess).Pid())
- }
- }
-
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Info(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
- case events.EventWorkerLog:
- server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
- }
- }
-}
-
-func (server *Plugin) collectWorkerLogs(event interface{}) {
- if we, ok := event.(events.WorkerEvent); ok {
- switch we.Event {
- case events.EventWorkerError:
- server.log.Error(we.Payload.(error).Error(), "pid", we.Worker.(worker.BaseProcess).Pid())
- case events.EventWorkerLog:
- server.log.Info(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.(worker.BaseProcess).Pid())
- }
- }
-}