diff options
Diffstat (limited to 'plugins/factory')
-rwxr-xr-x | plugins/factory/app.go | 112 | ||||
-rwxr-xr-x | plugins/factory/app_provider.go | 17 | ||||
-rwxr-xr-x | plugins/factory/config.go | 37 | ||||
-rwxr-xr-x | plugins/factory/factory.go | 76 | ||||
-rwxr-xr-x | plugins/factory/hello.php | 1 | ||||
-rwxr-xr-x | plugins/factory/tests/factory_test.go | 7 | ||||
-rwxr-xr-x | plugins/factory/tests/plugin_1.go | 8 | ||||
-rwxr-xr-x | plugins/factory/tests/plugin_2.go | 24 |
8 files changed, 134 insertions, 148 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go index e4002963..4951e3df 100755 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,76 @@ package factory import ( - "errors" + "context" "fmt" + "log" "os" "os/exec" "strings" - "time" + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() + + return nil +} + +// Name contains service name. +func (app *App) Name() string { + return ServiceName +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) + } + + return errCh +} - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" +func (app *App) Stop() error { + if app.factory == nil { + return nil } - return nil + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +93,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +146,7 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100755 index 024c5bea..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory(env Env) (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100755 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100755 index 5d80682d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,76 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - spw Spawner - eb *events.EventBroadcaster -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - factory, err := wf.spw.NewFactory(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, factory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.eb.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - wb, err := roadrunner.InitBaseWorker(cmd()) - if err != nil { - return nil, err - } - - return wb, nil -} - -func (wf *WFactory) Init(app Spawner) error { - wf.spw = app - wf.eb = events.NewEventBroadcaster() - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.eb.AddListener(l) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100755 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 5347083a..6c264fd6 100755 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -31,11 +31,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Register(&factory.WFactory{}) - if err != nil { - t.Fatal(err) - } - err = container.Register(&Foo{}) if err != nil { t.Fatal(err) @@ -65,7 +60,7 @@ func TestFactory(t *testing.T) { for { select { case e := <-errCh: - assert.NoError(t, e.Error.Err) + assert.NoError(t, e.Error) assert.NoError(t, container.Stop()) return case <-c: diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..9011bb00 100755 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -10,10 +10,10 @@ import ( type Foo struct { configProvider config.Provider - spawner factory.Spawner + spawner factory.AppFactory } -func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { +func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error { f.configProvider = p f.spawner = spw return nil @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.NewCmdFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..9f401bec 100755 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,28 +13,26 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory - spw factory.Spawner + wf factory.AppFactory } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error { f.configProvider = p f.wf = workerFactory - f.spw = spawner return nil } func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.wf.NewCmdFactory(nil) if err != nil { errCh <- err return errCh @@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error { _ = w - poolConfig := &roadrunner.Config{ + poolConfig := roadrunner.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - TTL: 1000, - IdleTTL: 1000, - ExecTTL: time.Second * 10, - MaxPoolMemory: 10000, - MaxWorkerMemory: 10000, + Supervisor: roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: time.Second * 10, + MaxWorkerMemory: 1000, + }, } pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) |