summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
committerValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
commit77670fb7af0c892c9b3a589fd424534fad288e7a (patch)
tree3adcaa85db664a355abe2b28f1d7e4a3fc45689f /plugins
parent16fbf3104c3c34bd9355593052b686acd26a8efe (diff)
Update according activity worker
Diffstat (limited to 'plugins')
-rw-r--r--plugins/config/provider.go4
-rw-r--r--plugins/config/tests/plugin1.go9
-rw-r--r--plugins/config/viper.go9
-rw-r--r--plugins/factory/app.go39
-rw-r--r--plugins/factory/factory.go10
-rw-r--r--plugins/factory/tests/factory_test.go1
-rw-r--r--plugins/factory/tests/plugin_1.go4
7 files changed, 46 insertions, 30 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
index bec417e9..580231fd 100644
--- a/plugins/config/provider.go
+++ b/plugins/config/provider.go
@@ -10,6 +10,10 @@ type Provider interface {
// }
// }
UnmarshalKey(name string, out interface{}) error
+
// Get used to get config section
Get(name string) interface{}
+
+ // Has checks if config section exists.
+ Has(name string) bool
}
diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go
index 7573dc82..7c5f2afd 100644
--- a/plugins/config/tests/plugin1.go
+++ b/plugins/config/tests/plugin1.go
@@ -15,18 +15,17 @@ type ReloadConfig struct {
}
type ServiceConfig struct {
- Enabled bool
+ Enabled bool
Recursive bool
- Patterns []string
- Dirs []string
- Ignore []string
+ Patterns []string
+ Dirs []string
+ Ignore []string
}
type Foo struct {
configProvider config.Provider
}
-
// Depends on S2 and DB (S3 in the current case)
func (f *Foo) Init(p config.Provider) error {
f.configProvider = p
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index 0362e79b..b276dbe2 100644
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -17,17 +17,21 @@ type ViperProvider struct {
//////// ENDURE //////////
func (v *ViperProvider) Init() error {
v.viper = viper.New()
+
// read in environment variables that match
v.viper.AutomaticEnv()
if v.Prefix == "" {
return errors.New("prefix should be set")
}
+
v.viper.SetEnvPrefix(v.Prefix)
if v.Path == "" {
return errors.New("path should be set")
}
+
v.viper.SetConfigFile(v.Path)
v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+
return v.viper.ReadInConfig()
}
@@ -62,6 +66,11 @@ func (v *ViperProvider) Get(name string) interface{} {
return v.viper.Get(name)
}
+// Has checks if config section exists.
+func (v *ViperProvider) Has(name string) bool {
+ return v.viper.IsSet(name)
+}
+
/////////// PRIVATE //////////////
func parseFlag(flag string) (string, string, error) {
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index f9e7944c..753ca2a9 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,7 +1,6 @@
package factory
import (
- "context"
"errors"
"fmt"
"os"
@@ -21,11 +20,10 @@ type AppConfig struct {
Group string
Env Env
- Relay string
// Listen defines connection method and factory to be used to connect to workers:
// "pipes", "tcp://:6001", "unix://rr.sock"
// This config section must not change on re-configuration.
- Listen string
+ Relay string
// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
// must not change on re-configuration.
@@ -33,23 +31,28 @@ type AppConfig struct {
}
type App struct {
- cfg *AppConfig
+ cfg AppConfig
configProvider config.Provider
factory roadrunner.Factory
}
func (app *App) Init(provider config.Provider) error {
- app.cfg = &AppConfig{}
+ app.cfg = AppConfig{}
app.configProvider = provider
return nil
}
func (app *App) Configure() error {
- err := app.configProvider.UnmarshalKey("app", app.cfg)
+ err := app.configProvider.UnmarshalKey("app", &app.cfg)
if err != nil {
return err
}
+
+ if app.cfg.Relay == "" {
+ app.cfg.Relay = "pipes"
+ }
+
return nil
}
@@ -83,30 +86,28 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
// todo ENV unused
func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
- // if Listen is empty or doesn't contain separator, return error
- if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") {
- return nil, errors.New("relay should be set")
- }
-
- lsn, err := util.CreateListener(app.cfg.Listen)
- if err != nil {
- return nil, err
+ if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ return roadrunner.NewPipeFactory(), nil
}
- dsn := strings.Split(app.cfg.Listen, "://")
+ dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
+ lsn, err := util.CreateListener(app.cfg.Relay)
+ if err != nil {
+ return nil, err
+ }
+
switch dsn[0] {
// sockets group
case "unix":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
- // pipes
default:
- return roadrunner.NewPipeFactory(), nil
+ return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
}
@@ -116,10 +117,6 @@ func (app *App) Serve() chan error {
}
func (app *App) Stop() error {
- err := app.factory.Close(context.Background())
- if err != nil {
- return err
- }
return nil
}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
index c5490cd6..5d80682d 100644
--- a/plugins/factory/factory.go
+++ b/plugins/factory/factory.go
@@ -3,9 +3,11 @@ package factory
import (
"context"
- "github.com/spiral/roadrunner/v2/plugins/events"
+ "log"
+ "github.com/fatih/color"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/events"
)
type WorkerFactory interface {
@@ -23,6 +25,7 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e
if err != nil {
return nil, err
}
+
factory, err := wf.spw.NewFactory(env)
if err != nil {
return nil, err
@@ -37,6 +40,11 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e
go func() {
for e := range p.Events() {
wf.eb.Push(e)
+ if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
}
}()
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 38e939e1..72e28f84 100644
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -46,7 +46,6 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
-
err = container.Init()
if err != nil {
t.Fatal(err)
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 0c44a0d1..5ab6df73 100644
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -9,8 +9,8 @@ import (
)
type Foo struct {
- configProvider config.Provider
- spawner factory.Spawner
+ configProvider config.Provider
+ spawner factory.Spawner
}
func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {