diff options
Diffstat (limited to 'plugins/app')
-rw-r--r-- | plugins/app/app.go | 169 | ||||
-rw-r--r-- | plugins/app/config.go | 37 | ||||
-rw-r--r-- | plugins/app/tests/.rr.yaml | 9 | ||||
-rw-r--r-- | plugins/app/tests/factory_test.go | 78 | ||||
-rw-r--r-- | plugins/app/tests/hello.php | 1 | ||||
-rw-r--r-- | plugins/app/tests/plugin_1.go | 55 | ||||
-rw-r--r-- | plugins/app/tests/plugin_2.go | 88 |
7 files changed, 437 insertions, 0 deletions
diff --git a/plugins/app/app.go b/plugins/app/app.go new file mode 100644 index 00000000..ebb42631 --- /dev/null +++ b/plugins/app/app.go @@ -0,0 +1,169 @@ +package app + +import ( + "context" + "fmt" + "go.uber.org/zap" + "log" + "os" + "os/exec" + "strings" + + "github.com/fatih/color" + "github.com/spiral/endure/errors" + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/util" +) + +const ServiceName = "app" + +type Env map[string]string + +// WorkerFactory creates workers for the application. +type WorkerFactory interface { + CmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) +} + +// App manages worker +type App struct { + cfg Config + log *zap.Logger + factory roadrunner.Factory +} + +// Init application provider. +func (app *App) Init(cfg config.Provider, log *zap.Logger) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) + if err != nil { + return err + } + app.cfg.InitDefaults() + app.log = log + + return nil +} + +// Name contains service name. +func (app *App) Name() string { + return ServiceName +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) + } + + app.log.Info("Started worker factory", zap.Any("relay", app.cfg.Relay), zap.Any("command", app.cfg.Command)) + + return errCh +} + +func (app *App) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) +} + +// CmdFactory provides worker command factory assocated with given context. +func (app *App) CmdFactory(env Env) (func() *exec.Cmd, error) { + var cmdArgs []string + + // create command according to the config + cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + + return func() *exec.Cmd { + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + util.IsolateProcess(cmd) + + // if user is not empty, and OS is linux or macos + // execute php worker from that particular user + if app.cfg.User != "" { + err := util.ExecuteFromUser(cmd, app.cfg.User) + if err != nil { + return nil + } + } + + cmd.Env = app.setEnv(env) + + return cmd + }, nil +} + +// NewWorker issues new standalone worker. +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.CmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +// NewWorkerPool issues new worker pool. +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.CmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +// creates relay and worker factory. +func (app *App) initFactory() (roadrunner.Factory, error) { + if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + return roadrunner.NewPipeFactory(), nil + } + + dsn := strings.Split(app.cfg.Relay, "://") + if len(dsn) != 2 { + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } + + lsn, err := util.CreateListener(app.cfg.Relay) + if err != nil { + return nil, err + } + + switch dsn[0] { + // sockets group + case "unix": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + case "tcp": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + default: + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) + } +} + +func (app *App) setEnv(e Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) + for k, v := range e { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return env +} diff --git a/plugins/app/config.go b/plugins/app/config.go new file mode 100644 index 00000000..eaa54e2d --- /dev/null +++ b/plugins/app/config.go @@ -0,0 +1,37 @@ +package app + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/app/tests/.rr.yaml b/plugins/app/tests/.rr.yaml new file mode 100644 index 00000000..171f51dc --- /dev/null +++ b/plugins/app/tests/.rr.yaml @@ -0,0 +1,9 @@ +app: + command: "php hello.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s"
\ No newline at end of file diff --git a/plugins/app/tests/factory_test.go b/plugins/app/tests/factory_test.go new file mode 100644 index 00000000..7c885797 --- /dev/null +++ b/plugins/app/tests/factory_test.go @@ -0,0 +1,78 @@ +package tests + +import ( + "os" + "os/signal" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/stretchr/testify/assert" +) + +func TestFactory(t *testing.T) { + container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.ViperProvider{} + vp.Path = ".rr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&app.App{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo2{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + + tt := time.NewTicker(time.Second * 2) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } +} diff --git a/plugins/app/tests/hello.php b/plugins/app/tests/hello.php new file mode 100644 index 00000000..bf9e82cc --- /dev/null +++ b/plugins/app/tests/hello.php @@ -0,0 +1 @@ +<?php echo "hello1 - " . time();
\ No newline at end of file diff --git a/plugins/app/tests/plugin_1.go b/plugins/app/tests/plugin_1.go new file mode 100644 index 00000000..7259ea9d --- /dev/null +++ b/plugins/app/tests/plugin_1.go @@ -0,0 +1,55 @@ +package tests + +import ( + "errors" + "fmt" + + "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +type Foo struct { + configProvider config.Provider + spawner app.WorkerFactory +} + +func (f *Foo) Init(p config.Provider, spw app.WorkerFactory) error { + f.configProvider = p + f.spawner = spw + return nil +} + +func (f *Foo) Serve() chan error { + errCh := make(chan error, 1) + + r := &app.Config{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.spawner.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo) Stop() error { + return nil +} diff --git a/plugins/app/tests/plugin_2.go b/plugins/app/tests/plugin_2.go new file mode 100644 index 00000000..fbb9ca11 --- /dev/null +++ b/plugins/app/tests/plugin_2.go @@ -0,0 +1,88 @@ +package tests + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +type Foo2 struct { + configProvider config.Provider + wf app.WorkerFactory +} + +func (f *Foo2) Init(p config.Provider, workerFactory app.WorkerFactory) error { + f.configProvider = p + f.wf = workerFactory + return nil +} + +func (f *Foo2) Serve() chan error { + errCh := make(chan error, 1) + + r := &app.Config{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.wf.CmdFactory(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + _ = w + + poolConfig := roadrunner.Config{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + Supervisor: &roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: 10, + MaxWorkerMemory: 1000, + }, + } + + pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + _ = pool + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo2) Stop() error { + return nil +} |