summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
committerValery Piashchynski <[email protected]>2020-10-13 13:55:20 +0300
commit0dc44d54cfcc9dd3fa09a41136f35a9a8d26b994 (patch)
treeffcb65010bebe9f5b5436192979e64b2402a6ec0 /plugins
parent08d6b6b7f773f83b286cd48c1a0fbec9a62fb42b (diff)
Initial commit of RR 2.0v2.0.0-alpha1
Diffstat (limited to 'plugins')
-rw-r--r--plugins/config/provider.go15
-rw-r--r--plugins/config/tests/.rr.yaml28
-rw-r--r--plugins/config/tests/config_test.go67
-rw-r--r--plugins/config/tests/plugin1.go54
-rw-r--r--plugins/config/viper.go86
-rw-r--r--plugins/events/broadcaster.go24
-rw-r--r--plugins/factory/app.go133
-rw-r--r--plugins/factory/app_provider.go17
-rw-r--r--plugins/factory/factory.go67
-rw-r--r--plugins/factory/hello.php1
-rw-r--r--plugins/factory/tests/.rr.yaml9
-rw-r--r--plugins/factory/tests/factory_test.go85
-rw-r--r--plugins/factory/tests/hello.php1
-rw-r--r--plugins/factory/tests/plugin_1.go55
-rw-r--r--plugins/factory/tests/plugin_2.go88
15 files changed, 730 insertions, 0 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
new file mode 100644
index 00000000..bec417e9
--- /dev/null
+++ b/plugins/config/provider.go
@@ -0,0 +1,15 @@
+package config
+
+type Provider interface {
+ // Unmarshal configuration section into configuration object.
+ //
+ // func (h *HttpService) Init(cp config.Provider) error {
+ // h.config := &HttpConfig{}
+ // if err := configProvider.UnmarshalKey("http", h.config); err != nil {
+ // return err
+ // }
+ // }
+ UnmarshalKey(name string, out interface{}) error
+ // Get used to get config section
+ Get(name string) interface{}
+}
diff --git a/plugins/config/tests/.rr.yaml b/plugins/config/tests/.rr.yaml
new file mode 100644
index 00000000..df9077d0
--- /dev/null
+++ b/plugins/config/tests/.rr.yaml
@@ -0,0 +1,28 @@
+reload:
+ # enable or disable file watcher
+ enabled: true
+ # sync interval
+ interval: 1s
+ # global patterns to sync
+ patterns: [".php"]
+ # list of included for sync services
+ services:
+ http:
+ # recursive search for file patterns to add
+ recursive: true
+ # ignored folders
+ ignore: ["vendor"]
+ # service specific file pattens to sync
+ patterns: [".php", ".go",".md",]
+ # directories to sync. If recursive is set to true,
+ # recursive sync will be applied only to the directories in `dirs` section
+ dirs: ["."]
+ jobs:
+ recursive: false
+ ignore: ["service/metrics"]
+ dirs: ["./jobs"]
+ rpc:
+ recursive: true
+ patterns: [".json"]
+ # to include all project directories from workdir, leave `dirs` empty or add a dot "."
+ dirs: [""]
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
new file mode 100644
index 00000000..baeafbd2
--- /dev/null
+++ b/plugins/config/tests/config_test.go
@@ -0,0 +1,67 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/stretchr/testify/assert"
+ "github.com/temporalio/roadrunner-temporal/config"
+)
+
+func TestViperProvider_Init(t *testing.T) {
+ container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true))
+ if err != nil {
+ t.Fatal(err)
+ }
+ vp := &config.ViperProvider{}
+ vp.Path = ".rr.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal)
+ signal.Notify(c, os.Interrupt)
+
+ tt := time.NewTicker(time.Second * 2)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error.Err)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+
+}
diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go
new file mode 100644
index 00000000..4e7a5317
--- /dev/null
+++ b/plugins/config/tests/plugin1.go
@@ -0,0 +1,54 @@
+package tests
+
+import (
+ "errors"
+ "time"
+
+ "github.com/temporalio/roadrunner-temporal/config"
+)
+
+// ReloadConfig is a Reload configuration point.
+type ReloadConfig struct {
+ Interval time.Duration
+ Patterns []string
+ Services map[string]ServiceConfig
+}
+
+type ServiceConfig struct {
+ Enabled bool
+ Recursive bool
+ Patterns []string
+ Dirs []string
+ Ignore []string
+}
+
+type Foo struct {
+ configProvider config.Provider
+}
+
+
+// Depends on S2 and DB (S3 in the current case)
+func (f *Foo) Init(p config.Provider) error {
+ f.configProvider = p
+ return nil
+}
+
+func (f *Foo) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ r := &ReloadConfig{}
+ err := f.configProvider.UnmarshalKey("reload", r)
+ if err != nil {
+ errCh <- err
+ }
+
+ if len(r.Patterns) == 0 {
+ errCh <- errors.New("should be at least one pattern, but got 0")
+ }
+
+ return errCh
+}
+
+func (f *Foo) Stop() error {
+ return nil
+}
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
new file mode 100644
index 00000000..0362e79b
--- /dev/null
+++ b/plugins/config/viper.go
@@ -0,0 +1,86 @@
+package config
+
+import (
+ "errors"
+ "fmt"
+ "strings"
+
+ "github.com/spf13/viper"
+)
+
+type ViperProvider struct {
+ viper *viper.Viper
+ Path string
+ Prefix string
+}
+
+//////// ENDURE //////////
+func (v *ViperProvider) Init() error {
+ v.viper = viper.New()
+ // read in environment variables that match
+ v.viper.AutomaticEnv()
+ if v.Prefix == "" {
+ return errors.New("prefix should be set")
+ }
+ v.viper.SetEnvPrefix(v.Prefix)
+ if v.Path == "" {
+ return errors.New("path should be set")
+ }
+ v.viper.SetConfigFile(v.Path)
+ v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ return v.viper.ReadInConfig()
+}
+
+///////////// VIPER ///////////////
+
+// Overwrite overwrites existing config with provided values
+func (v *ViperProvider) Overwrite(values map[string]string) error {
+ if len(values) != 0 {
+ for _, flag := range values {
+ key, value, err := parseFlag(flag)
+ if err != nil {
+ return err
+ }
+ v.viper.Set(key, value)
+ }
+ }
+
+ return nil
+}
+
+//
+func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
+ err := v.viper.UnmarshalKey(name, &out)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// Get raw config in a form of config section.
+func (v *ViperProvider) Get(name string) interface{} {
+ return v.viper.Get(name)
+}
+
+/////////// PRIVATE //////////////
+
+func parseFlag(flag string) (string, string, error) {
+ if !strings.Contains(flag, "=") {
+ return "", "", fmt.Errorf("invalid flag `%s`", flag)
+ }
+
+ parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2)
+
+ return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil
+}
+
+func parseValue(value string) string {
+ escape := []rune(value)[0]
+
+ if escape == '"' || escape == '\'' || escape == '`' {
+ value = strings.Trim(value, string(escape))
+ value = strings.Replace(value, fmt.Sprintf("\\%s", string(escape)), string(escape), -1)
+ }
+
+ return value
+}
diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go
new file mode 100644
index 00000000..778b307d
--- /dev/null
+++ b/plugins/events/broadcaster.go
@@ -0,0 +1,24 @@
+package events
+
+type EventListener interface {
+ Handle(event interface{})
+}
+
+type EventBroadcaster struct {
+ listeners []EventListener
+}
+
+func NewEventBroadcaster() *EventBroadcaster {
+ return &EventBroadcaster{}
+}
+
+func (eb *EventBroadcaster) AddListener(l EventListener) {
+ // todo: threadcase
+ eb.listeners = append(eb.listeners, l)
+}
+
+func (eb *EventBroadcaster) Push(e interface{}) {
+ for _, l := range eb.listeners {
+ l.Handle(e)
+ }
+}
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
new file mode 100644
index 00000000..8ed65531
--- /dev/null
+++ b/plugins/factory/app.go
@@ -0,0 +1,133 @@
+package factory
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "strings"
+ "time"
+
+ "github.com/temporalio/roadrunner-temporal/config"
+ "github.com/temporalio/roadrunner-temporal/roadrunner"
+ "github.com/temporalio/roadrunner-temporal/roadrunner/util"
+)
+
+// AppConfig config combines factory, pool and cmd configurations.
+type AppConfig struct {
+ Command string
+ User string
+ Group string
+ Env Env
+
+ Relay string
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Listen string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration.
+ RelayTimeout time.Duration
+}
+
+type App struct {
+ cfg *AppConfig
+ configProvider config.Provider
+ factory roadrunner.Factory
+}
+
+func (app *App) Init(provider config.Provider) error {
+ app.cfg = &AppConfig{}
+ app.configProvider = provider
+
+ return nil
+}
+
+func (app *App) Configure() error {
+ err := app.configProvider.UnmarshalKey("app", app.cfg)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (app *App) Close() error {
+ return nil
+}
+
+func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
+ var cmdArgs []string
+ // create command according to the config
+ cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
+
+ return func() *exec.Cmd {
+ cmd := exec.Command(cmdArgs[0], cmdArgs[1:]...)
+ util.IsolateProcess(cmd)
+
+ // if user is not empty, and OS is linux or macos
+ // execute php worker from that particular user
+ if app.cfg.User != "" {
+ err := util.ExecuteFromUser(cmd, app.cfg.User)
+ if err != nil {
+ return nil
+ }
+ }
+
+ cmd.Env = app.setEnv(env)
+
+ return cmd
+ }, nil
+}
+
+// todo ENV unused
+func (app *App) NewFactory() (roadrunner.Factory, error) {
+ // if Listen is empty or doesn't contain separator, return error
+ if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") {
+ return nil, errors.New("relay should be set")
+ }
+
+ lsn, err := util.CreateListener(app.cfg.Listen)
+ if err != nil {
+ return nil, err
+ }
+
+ dsn := strings.Split(app.cfg.Listen, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ switch dsn[0] {
+ // sockets group
+ case "unix":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ case "tcp":
+ return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
+ // pipes
+ default:
+ return roadrunner.NewPipeFactory(), nil
+ }
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error)
+ return errCh
+}
+
+func (app *App) Stop() error {
+ err := app.factory.Close(context.Background())
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (app *App) setEnv(e Env) []string {
+ env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
+ for k, v := range e {
+ env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
+ }
+
+ return env
+}
diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go
new file mode 100644
index 00000000..58fc686c
--- /dev/null
+++ b/plugins/factory/app_provider.go
@@ -0,0 +1,17 @@
+package factory
+
+import (
+ "os/exec"
+
+ "github.com/temporalio/roadrunner-temporal/roadrunner"
+)
+
+type Env map[string]string
+
+type Spawner interface {
+ // CmdFactory create new command factory with given env variables.
+ NewCmd(env Env) (func() *exec.Cmd, error)
+
+ // NewFactory inits new factory for workers.
+ NewFactory(env Env) (roadrunner.Factory, error)
+}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
new file mode 100644
index 00000000..74fd241f
--- /dev/null
+++ b/plugins/factory/factory.go
@@ -0,0 +1,67 @@
+package factory
+
+import (
+ "context"
+ "github.com/temporalio/roadrunner-temporal/events"
+
+ "github.com/temporalio/roadrunner-temporal/roadrunner"
+)
+
+type WorkerFactory interface {
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error)
+}
+
+type WFactory struct {
+ spw Spawner
+ eb *events.EventBroadcaster
+}
+
+func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ cmd, err := wf.spw.NewCmd(env)
+ if err != nil {
+ return nil, err
+ }
+ factory, err := wf.spw.NewFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, cmd, factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO event to stop
+ go func() {
+ for e := range p.Events() {
+ wf.eb.Push(e)
+ }
+ }()
+
+ return p, nil
+}
+
+func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ cmd, err := wf.spw.NewCmd(env)
+ if err != nil {
+ return nil, err
+ }
+
+ wb, err := roadrunner.InitBaseWorker(cmd())
+ if err != nil {
+ return nil, err
+ }
+
+ return wb, nil
+}
+
+func (wf *WFactory) Init(app Spawner) error {
+ wf.spw = app
+ wf.eb = events.NewEventBroadcaster()
+ return nil
+}
+
+func (wf *WFactory) AddListener(l events.EventListener) {
+ wf.eb.AddListener(l)
+}
diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php
new file mode 100644
index 00000000..c6199449
--- /dev/null
+++ b/plugins/factory/hello.php
@@ -0,0 +1 @@
+<?php echo "hello -" . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/.rr.yaml b/plugins/factory/tests/.rr.yaml
new file mode 100644
index 00000000..171f51dc
--- /dev/null
+++ b/plugins/factory/tests/.rr.yaml
@@ -0,0 +1,9 @@
+app:
+ command: "php hello.php"
+ user: ""
+ group: ""
+ env:
+ "RR_CONFIG": "/some/place/on/the/C134"
+ "RR_CONFIG2": "C138"
+ relay: "pipes"
+ relayTimeout: "20s" \ No newline at end of file
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
new file mode 100644
index 00000000..880a7cf8
--- /dev/null
+++ b/plugins/factory/tests/factory_test.go
@@ -0,0 +1,85 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/stretchr/testify/assert"
+ "github.com/temporalio/roadrunner-temporal/config"
+ "github.com/temporalio/roadrunner-temporal/factory"
+)
+
+func TestFactory(t *testing.T) {
+ container, err := endure.NewContainer(endure.DebugLevel, endure.RetryOnFail(true))
+ if err != nil {
+ t.Fatal(err)
+ }
+ // config plugin
+ vp := &config.ViperProvider{}
+ vp.Path = ".rr.yaml"
+ vp.Prefix = "rr"
+ err = container.Register(vp)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&factory.App{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&factory.WFactory{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = container.Register(&Foo2{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+
+ err = container.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := container.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // stop by CTRL+C
+ c := make(chan os.Signal)
+ signal.Notify(c, os.Interrupt)
+
+ tt := time.NewTicker(time.Second * 2)
+
+ for {
+ select {
+ case e := <-errCh:
+ assert.NoError(t, e.Error.Err)
+ assert.NoError(t, container.Stop())
+ return
+ case <-c:
+ er := container.Stop()
+ if er != nil {
+ panic(er)
+ }
+ return
+ case <-tt.C:
+ tt.Stop()
+ assert.NoError(t, container.Stop())
+ return
+ }
+ }
+
+}
diff --git a/plugins/factory/tests/hello.php b/plugins/factory/tests/hello.php
new file mode 100644
index 00000000..bf9e82cc
--- /dev/null
+++ b/plugins/factory/tests/hello.php
@@ -0,0 +1 @@
+<?php echo "hello1 - " . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
new file mode 100644
index 00000000..a7aba98e
--- /dev/null
+++ b/plugins/factory/tests/plugin_1.go
@@ -0,0 +1,55 @@
+package tests
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/temporalio/roadrunner-temporal/config"
+ "github.com/temporalio/roadrunner-temporal/factory"
+)
+
+type Foo struct {
+ configProvider config.Provider
+ spawner factory.Spawner
+}
+
+func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
+ f.configProvider = p
+ f.spawner = spw
+ return nil
+}
+
+func (f *Foo) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ r := &factory.AppConfig{}
+ err := f.configProvider.UnmarshalKey("app", r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ cmd, err := f.spawner.NewCmd(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.New("command is nil")
+ return errCh
+ }
+ a := cmd()
+ out, err := a.Output()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ fmt.Println(string(out))
+
+ return errCh
+}
+
+func (f *Foo) Stop() error {
+ return nil
+}
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
new file mode 100644
index 00000000..d0c3ea2c
--- /dev/null
+++ b/plugins/factory/tests/plugin_2.go
@@ -0,0 +1,88 @@
+package tests
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/temporalio/roadrunner-temporal/config"
+ "github.com/temporalio/roadrunner-temporal/factory"
+ "github.com/temporalio/roadrunner-temporal/roadrunner"
+)
+
+type Foo2 struct {
+ configProvider config.Provider
+ wf factory.WorkerFactory
+ spw factory.Spawner
+}
+
+func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error {
+ f.configProvider = p
+ f.wf = workerFactory
+ f.spw = spawner
+ return nil
+}
+
+func (f *Foo2) Serve() chan error {
+ errCh := make(chan error, 1)
+
+ r := &factory.AppConfig{}
+ err := f.configProvider.UnmarshalKey("app", r)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ cmd, err := f.spw.NewCmd(nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ if cmd == nil {
+ errCh <- errors.New("command is nil")
+ return errCh
+ }
+ a := cmd()
+ out, err := a.Output()
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ w, err := f.wf.NewWorker(context.Background(), nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ _ = w
+
+ poolConfig := &roadrunner.Config{
+ NumWorkers: 10,
+ MaxJobs: 100,
+ AllocateTimeout: time.Second * 10,
+ DestroyTimeout: time.Second * 10,
+ TTL: 1000,
+ IdleTTL: 1000,
+ ExecTTL: time.Second * 10,
+ MaxPoolMemory: 10000,
+ MaxWorkerMemory: 10000,
+ }
+
+ pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+
+ _ = pool
+
+ fmt.Println(string(out))
+
+ return errCh
+}
+
+func (f *Foo2) Stop() error {
+ return nil
+}