diff options
author | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-13 13:55:20 +0300 |
commit | 0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch) | |
tree | ffcb65010bebe9f5b5436192979e64b2402a6ec0 /plugins | |
parent | 08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff) |
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/config/provider.go | 15 | ||||
-rw-r--r-- | plugins/config/tests/.rr.yaml | 28 | ||||
-rw-r--r-- | plugins/config/tests/config_test.go | 67 | ||||
-rw-r--r-- | plugins/config/tests/plugin1.go | 54 | ||||
-rw-r--r-- | plugins/config/viper.go | 86 | ||||
-rw-r--r-- | plugins/events/broadcaster.go | 24 | ||||
-rw-r--r-- | plugins/factory/app.go | 133 | ||||
-rw-r--r-- | plugins/factory/app_provider.go | 17 | ||||
-rw-r--r-- | plugins/factory/factory.go | 67 | ||||
-rw-r--r-- | plugins/factory/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/.rr.yaml | 9 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 85 | ||||
-rw-r--r-- | plugins/factory/tests/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 55 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_2.go | 88 |
15 files changed, 730 insertions, 0 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go new file mode 100644 index 00000000..bec417e9 --- /dev/null +++ b/plugins/config/provider.go @@ -0,0 +1,15 @@ +package config + +type Provider interface { + // Unmarshal configuration section into configuration object. + // + // func (h *HttpService) Init(cp config.Provider) error { + // h.config := &HttpConfig{} + // if err := configProvider.UnmarshalKey("http", h.config); err != nil { + // return err + // } + // } + UnmarshalKey(name string, out interface{}) error + // Get used to get config section + Get(name string) interface{} +} diff --git a/plugins/config/tests/.rr.yaml b/plugins/config/tests/.rr.yaml new file mode 100644 index 00000000..df9077d0 --- /dev/null +++ b/plugins/config/tests/.rr.yaml @@ -0,0 +1,28 @@ +reload: + # enable or disable file watcher + enabled: true + # sync interval + interval: 1s + # global patterns to sync + patterns: [".php"] + # list of included for sync services + services: + http: + # recursive search for file patterns to add + recursive: true + # ignored folders + ignore: ["vendor"] + # service specific file pattens to sync + patterns: [".php", ".go",".md",] + # directories to sync. If recursive is set to true, + # recursive sync will be applied only to the directories in `dirs` section + dirs: ["."] + jobs: + recursive: false + ignore: ["service/metrics"] + dirs: ["./jobs"] + rpc: + recursive: true + patterns: [".json"] + # to include all project directories from workdir, leave `dirs` empty or add a dot "." + dirs: [""] diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go new file mode 100644 index 00000000..baeafbd2 --- /dev/null +++ b/plugins/config/tests/config_test.go @@ -0,0 +1,67 @@ +package tests + +import ( + "os" + "os/signal" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/stretchr/testify/assert" + "github.com/temporalio/roadrunner-temporal/config" +) + +func TestViperProvider_Init(t *testing.T) { + container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + if err != nil { + t.Fatal(err) + } + vp := &config.ViperProvider{} + vp.Path = ".rr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo{}) + if err != nil { + t.Fatal(err) + } + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + + tt := time.NewTicker(time.Second * 2) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error.Err) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } + +} diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go new file mode 100644 index 00000000..4e7a5317 --- /dev/null +++ b/plugins/config/tests/plugin1.go @@ -0,0 +1,54 @@ +package tests + +import ( + "errors" + "time" + + "github.com/temporalio/roadrunner-temporal/config" +) + +// ReloadConfig is a Reload configuration point. +type ReloadConfig struct { + Interval time.Duration + Patterns []string + Services map[string]ServiceConfig +} + +type ServiceConfig struct { + Enabled bool + Recursive bool + Patterns []string + Dirs []string + Ignore []string +} + +type Foo struct { + configProvider config.Provider +} + + +// Depends on S2 and DB (S3 in the current case) +func (f *Foo) Init(p config.Provider) error { + f.configProvider = p + return nil +} + +func (f *Foo) Serve() chan error { + errCh := make(chan error, 1) + + r := &ReloadConfig{} + err := f.configProvider.UnmarshalKey("reload", r) + if err != nil { + errCh <- err + } + + if len(r.Patterns) == 0 { + errCh <- errors.New("should be at least one pattern, but got 0") + } + + return errCh +} + +func (f *Foo) Stop() error { + return nil +} diff --git a/plugins/config/viper.go b/plugins/config/viper.go new file mode 100644 index 00000000..0362e79b --- /dev/null +++ b/plugins/config/viper.go @@ -0,0 +1,86 @@ +package config + +import ( + "errors" + "fmt" + "strings" + + "github.com/spf13/viper" +) + +type ViperProvider struct { + viper *viper.Viper + Path string + Prefix string +} + +//////// ENDURE ////////// +func (v *ViperProvider) Init() error { + v.viper = viper.New() + // read in environment variables that match + v.viper.AutomaticEnv() + if v.Prefix == "" { + return errors.New("prefix should be set") + } + v.viper.SetEnvPrefix(v.Prefix) + if v.Path == "" { + return errors.New("path should be set") + } + v.viper.SetConfigFile(v.Path) + v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + return v.viper.ReadInConfig() +} + +///////////// VIPER /////////////// + +// Overwrite overwrites existing config with provided values +func (v *ViperProvider) Overwrite(values map[string]string) error { + if len(values) != 0 { + for _, flag := range values { + key, value, err := parseFlag(flag) + if err != nil { + return err + } + v.viper.Set(key, value) + } + } + + return nil +} + +// +func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { + err := v.viper.UnmarshalKey(name, &out) + if err != nil { + return err + } + return nil +} + +// Get raw config in a form of config section. +func (v *ViperProvider) Get(name string) interface{} { + return v.viper.Get(name) +} + +/////////// PRIVATE ////////////// + +func parseFlag(flag string) (string, string, error) { + if !strings.Contains(flag, "=") { + return "", "", fmt.Errorf("invalid flag `%s`", flag) + } + + parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2) + + return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil +} + +func parseValue(value string) string { + escape := []rune(value)[0] + + if escape == '"' || escape == '\'' || escape == '`' { + value = strings.Trim(value, string(escape)) + value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1) + } + + return value +} diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go new file mode 100644 index 00000000..778b307d --- /dev/null +++ b/plugins/events/broadcaster.go @@ -0,0 +1,24 @@ +package events + +type EventListener interface { + Handle(event interface{}) +} + +type EventBroadcaster struct { + listeners []EventListener +} + +func NewEventBroadcaster() *EventBroadcaster { + return &EventBroadcaster{} +} + +func (eb *EventBroadcaster) AddListener(l EventListener) { + // todo: threadcase + eb.listeners = append(eb.listeners, l) +} + +func (eb *EventBroadcaster) Push(e interface{}) { + for _, l := range eb.listeners { + l.Handle(e) + } +} diff --git a/plugins/factory/app.go b/plugins/factory/app.go new file mode 100644 index 00000000..8ed65531 --- /dev/null +++ b/plugins/factory/app.go @@ -0,0 +1,133 @@ +package factory + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "strings" + "time" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/roadrunner" + "github.com/temporalio/roadrunner-temporal/roadrunner/util" +) + +// AppConfig config combines factory, pool and cmd configurations. +type AppConfig struct { + Command string + User string + Group string + Env Env + + Relay string + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Listen string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. + RelayTimeout time.Duration +} + +type App struct { + cfg *AppConfig + configProvider config.Provider + factory roadrunner.Factory +} + +func (app *App) Init(provider config.Provider) error { + app.cfg = &AppConfig{} + app.configProvider = provider + + return nil +} + +func (app *App) Configure() error { + err := app.configProvider.UnmarshalKey("app", app.cfg) + if err != nil { + return err + } + return nil +} + +func (app *App) Close() error { + return nil +} + +func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { + var cmdArgs []string + // create command according to the config + cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) + + return func() *exec.Cmd { + cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...) + util.IsolateProcess(cmd) + + // if user is not empty, and OS is linux or macos + // execute php worker from that particular user + if app.cfg.User != "" { + err := util.ExecuteFromUser(cmd, app.cfg.User) + if err != nil { + return nil + } + } + + cmd.Env = app.setEnv(env) + + return cmd + }, nil +} + +// todo ENV unused +func (app *App) NewFactory() (roadrunner.Factory, error) { + // if Listen is empty or doesn't contain separator, return error + if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") { + return nil, errors.New("relay should be set") + } + + lsn, err := util.CreateListener(app.cfg.Listen) + if err != nil { + return nil, err + } + + dsn := strings.Split(app.cfg.Listen, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + } + + switch dsn[0] { + // sockets group + case "unix": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + case "tcp": + return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil + // pipes + default: + return roadrunner.NewPipeFactory(), nil + } +} + +func (app *App) Serve() chan error { + errCh := make(chan error) + return errCh +} + +func (app *App) Stop() error { + err := app.factory.Close(context.Background()) + if err != nil { + return err + } + return nil +} + +func (app *App) setEnv(e Env) []string { + env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) + for k, v := range e { + env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) + } + + return env +} diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go new file mode 100644 index 00000000..58fc686c --- /dev/null +++ b/plugins/factory/app_provider.go @@ -0,0 +1,17 @@ +package factory + +import ( + "os/exec" + + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type Env map[string]string + +type Spawner interface { + // CmdFactory create new command factory with given env variables. + NewCmd(env Env) (func() *exec.Cmd, error) + + // NewFactory inits new factory for workers. + NewFactory(env Env) (roadrunner.Factory, error) +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go new file mode 100644 index 00000000..74fd241f --- /dev/null +++ b/plugins/factory/factory.go @@ -0,0 +1,67 @@ +package factory + +import ( + "context" + "github.com/temporalio/roadrunner-temporal/events" + + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type WorkerFactory interface { + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) +} + +type WFactory struct { + spw Spawner + eb *events.EventBroadcaster +} + +func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + factory, err := wf.spw.NewFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, cmd, factory, opt) + if err != nil { + return nil, err + } + + // TODO event to stop + go func() { + for e := range p.Events() { + wf.eb.Push(e) + } + }() + + return p, nil +} + +func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + cmd, err := wf.spw.NewCmd(env) + if err != nil { + return nil, err + } + + wb, err := roadrunner.InitBaseWorker(cmd()) + if err != nil { + return nil, err + } + + return wb, nil +} + +func (wf *WFactory) Init(app Spawner) error { + wf.spw = app + wf.eb = events.NewEventBroadcaster() + return nil +} + +func (wf *WFactory) AddListener(l events.EventListener) { + wf.eb.AddListener(l) +} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php new file mode 100644 index 00000000..c6199449 --- /dev/null +++ b/plugins/factory/hello.php @@ -0,0 +1 @@ +<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/.rr.yaml b/plugins/factory/tests/.rr.yaml new file mode 100644 index 00000000..171f51dc --- /dev/null +++ b/plugins/factory/tests/.rr.yaml @@ -0,0 +1,9 @@ +app: + command: "php hello.php" + user: "" + group: "" + env: + "RR_CONFIG": "/some/place/on/the/C134" + "RR_CONFIG2": "C138" + relay: "pipes" + relayTimeout: "20s"
\ No newline at end of file diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go new file mode 100644 index 00000000..880a7cf8 --- /dev/null +++ b/plugins/factory/tests/factory_test.go @@ -0,0 +1,85 @@ +package tests + +import ( + "os" + "os/signal" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/stretchr/testify/assert" + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" +) + +func TestFactory(t *testing.T) { + container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true)) + if err != nil { + t.Fatal(err) + } + // config plugin + vp := &config.ViperProvider{} + vp.Path = ".rr.yaml" + vp.Prefix = "rr" + err = container.Register(vp) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&factory.App{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&factory.WFactory{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo{}) + if err != nil { + t.Fatal(err) + } + + err = container.Register(&Foo2{}) + if err != nil { + t.Fatal(err) + } + + + err = container.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := container.Serve() + if err != nil { + t.Fatal(err) + } + + // stop by CTRL+C + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt) + + tt := time.NewTicker(time.Second * 2) + + for { + select { + case e := <-errCh: + assert.NoError(t, e.Error.Err) + assert.NoError(t, container.Stop()) + return + case <-c: + er := container.Stop() + if er != nil { + panic(er) + } + return + case <-tt.C: + tt.Stop() + assert.NoError(t, container.Stop()) + return + } + } + +} diff --git a/plugins/factory/tests/hello.php b/plugins/factory/tests/hello.php new file mode 100644 index 00000000..bf9e82cc --- /dev/null +++ b/plugins/factory/tests/hello.php @@ -0,0 +1 @@ +<?php echo "hello1 - " . time();
\ No newline at end of file diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go new file mode 100644 index 00000000..a7aba98e --- /dev/null +++ b/plugins/factory/tests/plugin_1.go @@ -0,0 +1,55 @@ +package tests + +import ( + "errors" + "fmt" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" +) + +type Foo struct { + configProvider config.Provider + spawner factory.Spawner +} + +func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { + f.configProvider = p + f.spawner = spw + return nil +} + +func (f *Foo) Serve() chan error { + errCh := make(chan error, 1) + + r := &factory.AppConfig{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.spawner.NewCmd(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo) Stop() error { + return nil +} diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go new file mode 100644 index 00000000..d0c3ea2c --- /dev/null +++ b/plugins/factory/tests/plugin_2.go @@ -0,0 +1,88 @@ +package tests + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/temporalio/roadrunner-temporal/config" + "github.com/temporalio/roadrunner-temporal/factory" + "github.com/temporalio/roadrunner-temporal/roadrunner" +) + +type Foo2 struct { + configProvider config.Provider + wf factory.WorkerFactory + spw factory.Spawner +} + +func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { + f.configProvider = p + f.wf = workerFactory + f.spw = spawner + return nil +} + +func (f *Foo2) Serve() chan error { + errCh := make(chan error, 1) + + r := &factory.AppConfig{} + err := f.configProvider.UnmarshalKey("app", r) + if err != nil { + errCh <- err + return errCh + } + + cmd, err := f.spw.NewCmd(nil) + if err != nil { + errCh <- err + return errCh + } + if cmd == nil { + errCh <- errors.New("command is nil") + return errCh + } + a := cmd() + out, err := a.Output() + if err != nil { + errCh <- err + return errCh + } + + w, err := f.wf.NewWorker(context.Background(), nil) + if err != nil { + errCh <- err + return errCh + } + + _ = w + + poolConfig := &roadrunner.Config{ + NumWorkers: 10, + MaxJobs: 100, + AllocateTimeout: time.Second * 10, + DestroyTimeout: time.Second * 10, + TTL: 1000, + IdleTTL: 1000, + ExecTTL: time.Second * 10, + MaxPoolMemory: 10000, + MaxWorkerMemory: 10000, + } + + pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) + if err != nil { + errCh <- err + return errCh + } + + _ = pool + + fmt.Println(string(out)) + + return errCh +} + +func (f *Foo2) Stop() error { + return nil +} |