diff options
author | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-19 14:01:59 +0300 |
commit | 77670fb7af0c892c9b3a589fd424534fad288e7a (patch) | |
tree | 3adcaa85db664a355abe2b28f1d7e4a3fc45689f /plugins | |
parent | 16fbf3104c3c34bd9355593052b686acd26a8efe (diff) |
Update according activity worker
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/config/provider.go | 4 | ||||
-rw-r--r-- | plugins/config/tests/plugin1.go | 9 | ||||
-rw-r--r-- | plugins/config/viper.go | 9 | ||||
-rw-r--r-- | plugins/factory/app.go | 39 | ||||
-rw-r--r-- | plugins/factory/factory.go | 10 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 4 |
7 files changed, 46 insertions, 30 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go index bec417e9..580231fd 100644 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -10,6 +10,10 @@ type Provider interface { // } // } UnmarshalKey(name string, out interface{}) error + // Get used to get config section Get(name string) interface{} + + // Has checks if config section exists. + Has(name string) bool } diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go index 7573dc82..7c5f2afd 100644 --- a/plugins/config/tests/plugin1.go +++ b/plugins/config/tests/plugin1.go @@ -15,18 +15,17 @@ type ReloadConfig struct { } type ServiceConfig struct { - Enabled bool + Enabled bool Recursive bool - Patterns []string - Dirs []string - Ignore []string + Patterns []string + Dirs []string + Ignore []string } type Foo struct { configProvider config.Provider } - // Depends on S2 and DB (S3 in the current case) func (f *Foo) Init(p config.Provider) error { f.configProvider = p diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0362e79b..b276dbe2 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -17,17 +17,21 @@ type ViperProvider struct { //////// ENDURE ////////// func (v *ViperProvider) Init() error { v.viper = viper.New() + // read in environment variables that match v.viper.AutomaticEnv() if v.Prefix == "" { return errors.New("prefix should be set") } + v.viper.SetEnvPrefix(v.Prefix) if v.Path == "" { return errors.New("path should be set") } + v.viper.SetConfigFile(v.Path) v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + return v.viper.ReadInConfig() } @@ -62,6 +66,11 @@ func (v *ViperProvider) Get(name string) interface{} { return v.viper.Get(name) } +// Has checks if config section exists. +func (v *ViperProvider) Has(name string) bool { + return v.viper.IsSet(name) +} + /////////// PRIVATE ////////////// func parseFlag(flag string) (string, string, error) { diff --git a/plugins/factory/app.go b/plugins/factory/app.go index f9e7944c..753ca2a9 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,7 +1,6 @@ package factory import ( - "context" "errors" "fmt" "os" @@ -21,11 +20,10 @@ type AppConfig struct { Group string Env Env - Relay string // Listen defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. - Listen string + Relay string // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section // must not change on re-configuration. @@ -33,23 +31,28 @@ type AppConfig struct { } type App struct { - cfg *AppConfig + cfg AppConfig configProvider config.Provider factory roadrunner.Factory } func (app *App) Init(provider config.Provider) error { - app.cfg = &AppConfig{} + app.cfg = AppConfig{} app.configProvider = provider return nil } func (app *App) Configure() error { - err := app.configProvider.UnmarshalKey("app", app.cfg) + err := app.configProvider.UnmarshalKey("app", &app.cfg) if err != nil { return err } + + if app.cfg.Relay == "" { + app.cfg.Relay = "pipes" + } + return nil } @@ -83,30 +86,28 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { // todo ENV unused func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { - // if Listen is empty or doesn't contain separator, return error - if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") { - return nil, errors.New("relay should be set") - } - - lsn, err := util.CreateListener(app.cfg.Listen) - if err != nil { - return nil, err + if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + return roadrunner.NewPipeFactory(), nil } - dsn := strings.Split(app.cfg.Listen, "://") + dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") } + lsn, err := util.CreateListener(app.cfg.Relay) + if err != nil { + return nil, err + } + switch dsn[0] { // sockets group case "unix": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil - // pipes default: - return roadrunner.NewPipeFactory(), nil + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") } } @@ -116,10 +117,6 @@ func (app *App) Serve() chan error { } func (app *App) Stop() error { - err := app.factory.Close(context.Background()) - if err != nil { - return err - } return nil } diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go index c5490cd6..5d80682d 100644 --- a/plugins/factory/factory.go +++ b/plugins/factory/factory.go @@ -3,9 +3,11 @@ package factory import ( "context" - "github.com/spiral/roadrunner/v2/plugins/events" + "log" + "github.com/fatih/color" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/events" ) type WorkerFactory interface { @@ -23,6 +25,7 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e if err != nil { return nil, err } + factory, err := wf.spw.NewFactory(env) if err != nil { return nil, err @@ -37,6 +40,11 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e go func() { for e := range p.Events() { wf.eb.Push(e) + if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } } }() diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 38e939e1..72e28f84 100644 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -46,7 +46,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Init() if err != nil { t.Fatal(err) diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 0c44a0d1..5ab6df73 100644 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -9,8 +9,8 @@ import ( ) type Foo struct { - configProvider config.Provider - spawner factory.Spawner + configProvider config.Provider + spawner factory.Spawner } func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { |