summaryrefslogtreecommitdiff
path: root/plugins/app/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-05 17:58:23 +0300
committerGitHub <[email protected]>2020-11-05 17:58:23 +0300
commit9fbe7726dd55cfedda724b7644e1b6bf7c1a6cb4 (patch)
treef2bf9b97d38103de51e2d140aa76666f9c6341c8 /plugins/app/plugin.go
parentedc45b3e24afdb5e56e74ffbbbd50e0e3b04922b (diff)
parent73da7300fcc9b8b22faa1c91fc1faff22ab944ff (diff)
Merge pull request #388 from spiral/enhancement/testsv2.0.0-alpha15
Tests for the new plugins
Diffstat (limited to 'plugins/app/plugin.go')
-rw-r--r--plugins/app/plugin.go177
1 files changed, 177 insertions, 0 deletions
diff --git a/plugins/app/plugin.go b/plugins/app/plugin.go
new file mode 100644
index 00000000..839685bd
--- /dev/null
+++ b/plugins/app/plugin.go
@@ -0,0 +1,177 @@
+package app
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+const ServiceName = "app"
+
+type Env map[string]string
+
+// WorkerFactory creates workers for the application.
+type WorkerFactory interface {
+ CmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
+}
+
+// Plugin manages worker
+type Plugin struct {
+ cfg Config
+ log log.Logger
+ factory roadrunner.Factory
+}
+
+// Init application provider.
+func (app *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
+ if err != nil {
+ return err
+ }
+ app.cfg.InitDefaults()
+ app.log = log
+
+ return nil
+}
+
+// Name contains service name.
+func (app *Plugin) Name() string {
+ return ServiceName
+}
+
+func (app *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
+ }
+
+ return errCh
+}
+
+func (app *Plugin) Stop() error {
+ if app.factory == nil {
+ return nil
+ }
+
+ return app.factory.Close(context.Background())
+}
+
+// CmdFactory provides worker command factory assocated with given context.
+func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
+ var cmdArgs []string
+
+ // create command according to the config
+ cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
+
+ return func() *exec.Cmd {
+ cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
+ util.IsolateProcess(cmd)
+
+ // if user is not empty, and OS is linux or macos
+ // execute php worker from that particular user
+ if app.cfg.User != "" {
+ err := util.ExecuteFromUser(cmd, app.cfg.User)
+ if err != nil {
+ return nil
+ }
+ }
+
+ cmd.Env = app.setEnv(env)
+
+ return cmd
+ }, nil
+}
+
+// NewWorker issues new standalone worker.
+func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.CmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ w, err := app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+ if err != nil {
+ return nil, err
+ }
+
+ w.AddListener(app.collectLogs)
+
+ return w, nil
+}
+
+// NewWorkerPool issues new worker pool.
+func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.CmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(app.collectLogs)
+
+ return p, nil
+}
+
+// creates relay and worker factory.
+func (app *Plugin) initFactory() (roadrunner.Factory, error) {
+ if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ return roadrunner.NewPipeFactory(), nil
+ }
+
+ dsn := strings.Split(app.cfg.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+
+ lsn, err := util.CreateListener(app.cfg.Relay)
+ if err != nil {
+ return nil, err
+ }
+
+ switch dsn[0] {
+ // sockets group
+ case "unix":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ case "tcp":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ default:
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+}
+
+func (app *Plugin) setEnv(e Env) []string {
+ env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
+ for k, v := range e {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+
+ return env
+}
+
+func (app *Plugin) collectLogs(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ switch we.Event {
+ case roadrunner.EventWorkerError:
+ app.log.Error(we.Payload.(error).Error(), "pid", we.Worker.Pid())
+ case roadrunner.EventWorkerLog:
+ app.log.Debug(strings.TrimRight(string(we.Payload.([]byte)), " \n\t"), "pid", we.Worker.Pid())
+ }
+ }
+}