diff options
Diffstat (limited to 'plugins/factory')
-rw-r--r-- | plugins/factory/app.go | 133 | ||||
-rw-r--r-- | plugins/factory/app_provider.go | 17 | ||||
-rw-r--r-- | plugins/factory/factory.go | 67 | ||||
-rw-r--r-- | plugins/factory/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/.rr.yaml | 9 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 85 | ||||
-rw-r--r-- | plugins/factory/tests/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 55 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_2.go | 88 |
9 files changed, 456 insertions, 0 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go new file mode 100644 index 00000000..8ed65531 --- /dev/null +++ b/plugins/factory/app.go @@ -0,0 +1,133 @@ +package factory + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "strings" + "time" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/roadrunner" + "github.com/temporalio/roadrunner-temporal/roadrunner/util" +) + +// AppConfig config combines factory, pool and cmd configurations. +type AppConfig struct { + Command string + User string + Group string + Env Env + + Relay string + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Listen string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration +} + +type App struct { + cfg *AppConfig + configProvider config.Provider + factory roadrunner.Factory +} + +func (app *App) Init(provider config.Provider) error { + app.cfg = &AppConfig{} + app.configProvider = provider + + return nil +} + +func (app *App) Configure() error { + err := app.configProvider.UnmarshalKey("app", app.cfg) + if err != nil { + return err + } + return nil +} + +func (app *App) Close() error { + return nil +} + +func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { + var cmdArgs []string + // create command according to the config + cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + + return func() *exec.Cmd { + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + util.IsolateProcess(cmd) + + // if user is not empty, and OS is linux or macos + // execute php worker from that particular user + if app.cfg.User != "" { + err := util.ExecuteFromUser(cmd, app.cfg.User) + if err != nil { + return nil + } + } + + cmd.Env = app.setEnv(env) + + return cmd + }, nil +} + +// todo ENV unused +func (app *App) NewFactory() (roadrunner.Factory, error) { + // if Listen is empty or doesn't contain separator, return error + if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") { + return nil, errors.New("relay should be set") + } + + lsn, err := util.CreateListener(app.cfg.Listen) + if err != nil { + return nil, err + } + + dsn := strings.Split(app.cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + } + + switch dsn[0] { + // sockets group + case "unix": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + case "tcp": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + // pipes + default: + return roadrunner.NewPipeFactory(), nil + } +} + +func (app *App) Serve() chan error { + errCh := make(chan error) + return errCh +} + +func (app *App) Stop() error { + err := app.factory.Close(context.Background()) + if err != nil { + return err + } + return nil +} + +func (app *App) setEnv(e Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) + for k, v := range e { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return env +} diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go new file mode 100644 index 00000000..58fc686c --- /dev/null +++ b/plugins/factory/app_provider.go @@ -0,0 +1,17 @@ +package factory + +import ( + "os/exec" + + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type Env map[string]string + +type Spawner interface { + // CmdFactory create new command factory with given env variables. + NewCmd(env Env) (func() *exec.Cmd, error) + + // NewFactory inits new factory for workers. + NewFactory(env Env) (roadrunner.Factory, error) +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go new file mode 100644 index 00000000..74fd241f --- /dev/null +++ b/plugins/factory/factory.go @@ -0,0 +1,67 @@ +package factory + +import ( + "context" + "github.com/temporalio/roadrunner-temporal/events" + + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type WorkerFactory interface { + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) +} + +type WFactory struct { + spw Spawner + eb *events.EventBroadcaster +} + +func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + factory, err := wf.spw.NewFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, cmd, factory, opt) + if err != nil { + return nil, err + } + + // TODO event to stop + go func() { + for e := range p.Events() { + wf.eb.Push(e) + } + }() + + return p, nil +} + +func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + + wb, err := roadrunner.InitBaseWorker(cmd()) + if err != nil { + return nil, err + } + + return wb, nil +} + +func (wf *WFactory) Init(app Spawner) error { + wf.spw = app + wf.eb = events.NewEventBroadcaster() + return nil +} + +func (wf *WFactory) AddListener(l events.EventListener) { + wf.eb.AddListener(l) +} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php new file mode 100644 index 00000000..c6199449 --- /dev/null +++ b/plugins/factory/hello.php @@ -0,0 +1 @@ +<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/.rr.yaml b/plugins/factory/tests/.rr.yaml new file mode 100644 index 00000000..171f51dc --- /dev/null +++ b/plugins/factory/tests/.rr.yaml @@ -0,0 +1,9 @@ +app: + command: "php hello.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s"
\ No newline at end of file diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go new file mode 100644 index 00000000..880a7cf8 --- /dev/null +++ b/plugins/factory/tests/factory_test.go @@ -0,0 +1,85 @@ +package tests + +import ( + "os" + "os/signal" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/stretchr/testify/assert" + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" +) + +func TestFactory(t *testing.T) { + container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.ViperProvider{} + vp.Path = ".rr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&factory.App{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&factory.WFactory{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo2{}) + if err != nil { + t.Fatal(err) + } + + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + + tt := time.NewTicker(time.Second * 2) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error.Err) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } + +} diff --git a/plugins/factory/tests/hello.php b/plugins/factory/tests/hello.php new file mode 100644 index 00000000..bf9e82cc --- /dev/null +++ b/plugins/factory/tests/hello.php @@ -0,0 +1 @@ +<?php echo "hello1 - " . time();
\ No newline at end of file diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go new file mode 100644 index 00000000..a7aba98e --- /dev/null +++ b/plugins/factory/tests/plugin_1.go @@ -0,0 +1,55 @@ +package tests + +import ( + "errors" + "fmt" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" +) + +type Foo struct { + configProvider config.Provider + spawner factory.Spawner +} + +func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { + f.configProvider = p + f.spawner = spw + return nil +} + +func (f *Foo) Serve() chan error { + errCh := make(chan error, 1) + + r := &factory.AppConfig{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.spawner.NewCmd(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo) Stop() error { + return nil +} diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go new file mode 100644 index 00000000..d0c3ea2c --- /dev/null +++ b/plugins/factory/tests/plugin_2.go @@ -0,0 +1,88 @@ +package tests + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type Foo2 struct { + configProvider config.Provider + wf factory.WorkerFactory + spw factory.Spawner +} + +func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { + f.configProvider = p + f.wf = workerFactory + f.spw = spawner + return nil +} + +func (f *Foo2) Serve() chan error { + errCh := make(chan error, 1) + + r := &factory.AppConfig{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.spw.NewCmd(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + _ = w + + poolConfig := &roadrunner.Config{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + TTL: 1000, + IdleTTL: 1000, + ExecTTL: time.Second * 10, + MaxPoolMemory: 10000, + MaxWorkerMemory: 10000, + } + + pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + _ = pool + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo2) Stop() error { + return nil +} |