diff options
author | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
commit | ba5c562f9038ba434e655fb82c44597fcccaff16 (patch) | |
tree | ff112b9dcffda63bc40094a57d0df61622368445 /plugins/factory | |
parent | 3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff) |
- massive update in roadrunner 2.0 abstractions
Diffstat (limited to 'plugins/factory')
-rw-r--r-- | plugins/factory/app.go | 114 | ||||
-rw-r--r-- | plugins/factory/app_provider.go | 17 | ||||
-rw-r--r-- | plugins/factory/config.go | 37 | ||||
-rw-r--r-- | plugins/factory/factory.go | 73 | ||||
-rw-r--r-- | plugins/factory/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 4 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_2.go | 8 |
7 files changed, 121 insertions, 133 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 74d8d828..b6cdb3b3 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,70 @@ package factory import ( - "errors" + "context" "fmt" - "os" - "os/exec" - "strings" - "time" - + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" + "log" + "os" + "os/exec" + "strings" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" + return nil +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) } - return nil + return errCh +} + +func (app *App) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory() (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100644 index e13b267f..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory() (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100644 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100644 index f7303b6d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,73 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - events *events.EventBroadcaster - app Spawner - wFactory roadrunner.Factory -} - -func (wf *WFactory) Init(app Spawner) (err error) { - wf.events = events.NewEventBroadcaster() - - wf.app = app - wf.wFactory, err = app.NewFactory() - if err != nil { - return nil - } - - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.events.AddListener(l) -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.events.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - return wf.wFactory.SpawnWorkerWithContext(ctx, cmd()) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100644 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..f6b06dd0 100644 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.CommandFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..dbdb065b 100644 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,11 +13,11 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory + wf factory.AppFactory spw factory.Spawner } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error { f.configProvider = p f.wf = workerFactory f.spw = spawner @@ -27,14 +27,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spaw func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.spw.CommandFactory(nil) if err != nil { errCh <- err return errCh |