summaryrefslogtreecommitdiff
path: root/plugins/app
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/app')
-rw-r--r--plugins/app/app.go169
-rw-r--r--plugins/app/config.go37
-rw-r--r--plugins/app/tests/.rr.yaml9
-rw-r--r--plugins/app/tests/factory_test.go78
-rw-r--r--plugins/app/tests/hello.php1
-rw-r--r--plugins/app/tests/plugin_1.go55
-rw-r--r--plugins/app/tests/plugin_2.go88
7 files changed, 437 insertions, 0 deletions
diff --git a/plugins/app/app.go b/plugins/app/app.go
new file mode 100644
index 00000000..ebb42631
--- /dev/null
+++ b/plugins/app/app.go
@@ -0,0 +1,169 @@
+package app
+
+import (
+ "context"
+ "fmt"
+ "go.uber.org/zap"
+ "log"
+ "os"
+ "os/exec"
+ "strings"
+
+ "github.com/fatih/color"
+ "github.com/spiral/endure/errors"
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+const ServiceName = "app"
+
+type Env map[string]string
+
+// WorkerFactory creates workers for the application.
+type WorkerFactory interface {
+ CmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
+}
+
+// App manages worker
+type App struct {
+ cfg Config
+ log *zap.Logger
+ factory roadrunner.Factory
+}
+
+// Init application provider.
+func (app *App) Init(cfg config.Provider, log *zap.Logger) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
+ if err != nil {
+ return err
+ }
+ app.cfg.InitDefaults()
+ app.log = log
+
+ return nil
+}
+
+// Name contains service name.
+func (app *App) Name() string {
+ return ServiceName
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
+ }
+
+ app.log.Info("Started worker factory", zap.Any("relay", app.cfg.Relay), zap.Any("command", app.cfg.Command))
+
+ return errCh
+}
+
+func (app *App) Stop() error {
+ if app.factory == nil {
+ return nil
+ }
+
+ return app.factory.Close(context.Background())
+}
+
+// CmdFactory provides worker command factory assocated with given context.
+func (app *App) CmdFactory(env Env) (func() *exec.Cmd, error) {
+ var cmdArgs []string
+
+ // create command according to the config
+ cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
+
+ return func() *exec.Cmd {
+ cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
+ util.IsolateProcess(cmd)
+
+ // if user is not empty, and OS is linux or macos
+ // execute php worker from that particular user
+ if app.cfg.User != "" {
+ err := util.ExecuteFromUser(cmd, app.cfg.User)
+ if err != nil {
+ return nil
+ }
+ }
+
+ cmd.Env = app.setEnv(env)
+
+ return cmd
+ }, nil
+}
+
+// NewWorker issues new standalone worker.
+func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.CmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+}
+
+// NewWorkerPool issues new worker pool.
+func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.CmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(func(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
+ })
+
+ return p, nil
+}
+
+// creates relay and worker factory.
+func (app *App) initFactory() (roadrunner.Factory, error) {
+ if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ return roadrunner.NewPipeFactory(), nil
+ }
+
+ dsn := strings.Split(app.cfg.Relay, "://")
+ if len(dsn) != 2 {
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+
+ lsn, err := util.CreateListener(app.cfg.Relay)
+ if err != nil {
+ return nil, err
+ }
+
+ switch dsn[0] {
+ // sockets group
+ case "unix":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ case "tcp":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ default:
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
+ }
+}
+
+func (app *App) setEnv(e Env) []string {
+ env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
+ for k, v := range e {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+
+ return env
+}
diff --git a/plugins/app/config.go b/plugins/app/config.go
new file mode 100644
index 00000000..eaa54e2d
--- /dev/null
+++ b/plugins/app/config.go
@@ -0,0 +1,37 @@
+package app
+
+import "time"
+
+// Config config combines factory, pool and cmd configurations.
+type Config struct {
+ // Command to run as application.
+ Command string
+
+ // User to run application under.
+ User string
+
+ // Group to run application under.
+ Group string
+
+ // Env represents application environment.
+ Env Env
+
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration
+}
+
+func (cfg *Config) InitDefaults() {
+ if cfg.Relay == "" {
+ cfg.Relay = "pipes"
+ }
+
+ if cfg.RelayTimeout == 0 {
+ cfg.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/app/tests/.rr.yaml b/plugins/app/tests/.rr.yaml
new file mode 100644
index 00000000..171f51dc
--- /dev/null
+++ b/plugins/app/tests/.rr.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php hello.php"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "pipes"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/app/tests/factory_test.go b/plugins/app/tests/factory_test.go
new file mode 100644
index 00000000..7c885797
--- /dev/null
+++ b/plugins/app/tests/factory_test.go
@@ -0,0 +1,78 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestFactory(t *testing.T) {
+ container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.ViperProvider{}
+ vp.Path = ".rr.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&app.App{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo2{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt)
+
+ tt := time.NewTicker(time.Second * 2)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+}
diff --git a/plugins/app/tests/hello.php b/plugins/app/tests/hello.php
new file mode 100644
index 00000000..bf9e82cc
--- /dev/null
+++ b/plugins/app/tests/hello.php
@@ -0,0 +1 @@
+<?php echo "hello1 - " . time(); \ No newline at end of file
diff --git a/plugins/app/tests/plugin_1.go b/plugins/app/tests/plugin_1.go
new file mode 100644
index 00000000..7259ea9d
--- /dev/null
+++ b/plugins/app/tests/plugin_1.go
@@ -0,0 +1,55 @@
+package tests
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Foo struct {
+ configProvider config.Provider
+ spawner app.WorkerFactory
+}
+
+func (f *Foo) Init(p config.Provider, spw app.WorkerFactory) error {
+ f.configProvider = p
+ f.spawner = spw
+ return nil
+}
+
+func (f *Foo) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ r := &app.Config{}
+ err := f.configProvider.UnmarshalKey("app", r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ cmd, err := f.spawner.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.New("command is nil")
+ return errCh
+ }
+ a := cmd()
+ out, err := a.Output()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ fmt.Println(string(out))
+
+ return errCh
+}
+
+func (f *Foo) Stop() error {
+ return nil
+}
diff --git a/plugins/app/tests/plugin_2.go b/plugins/app/tests/plugin_2.go
new file mode 100644
index 00000000..fbb9ca11
--- /dev/null
+++ b/plugins/app/tests/plugin_2.go
@@ -0,0 +1,88 @@
+package tests
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+type Foo2 struct {
+ configProvider config.Provider
+ wf app.WorkerFactory
+}
+
+func (f *Foo2) Init(p config.Provider, workerFactory app.WorkerFactory) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ return nil
+}
+
+func (f *Foo2) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ r := &app.Config{}
+ err := f.configProvider.UnmarshalKey("app", r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ cmd, err := f.wf.CmdFactory(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.New("command is nil")
+ return errCh
+ }
+ a := cmd()
+ out, err := a.Output()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ w, err := f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ _ = w
+
+ poolConfig := roadrunner.Config{
+ NumWorkers: 10,
+ MaxJobs: 100,
+ AllocateTimeout: time.Second * 10,
+ DestroyTimeout: time.Second * 10,
+ Supervisor: &roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: 10,
+ MaxWorkerMemory: 1000,
+ },
+ }
+
+ pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ _ = pool
+
+ fmt.Println(string(out))
+
+ return errCh
+}
+
+func (f *Foo2) Stop() error {
+ return nil
+}