summaryrefslogtreecommitdiff
path: root/plugins/factory
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
committerWolfy-J <[email protected]>2020-10-25 15:55:51 +0300
commitba5c562f9038ba434e655fb82c44597fcccaff16 (patch)
treeff112b9dcffda63bc40094a57d0df61622368445 /plugins/factory
parent3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff)
- massive update in roadrunner 2.0 abstractions
Diffstat (limited to 'plugins/factory')
-rw-r--r--plugins/factory/app.go114
-rw-r--r--plugins/factory/app_provider.go17
-rw-r--r--plugins/factory/config.go37
-rw-r--r--plugins/factory/factory.go73
-rw-r--r--plugins/factory/hello.php1
-rw-r--r--plugins/factory/tests/plugin_1.go4
-rw-r--r--plugins/factory/tests/plugin_2.go8
7 files changed, 121 insertions, 133 deletions
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index 74d8d828..b6cdb3b3 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,58 +1,70 @@
package factory
import (
- "errors"
+ "context"
"fmt"
- "os"
- "os/exec"
- "strings"
- "time"
-
+ "github.com/fatih/color"
+ "github.com/spiral/endure/errors"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
+ "log"
+ "os"
+ "os/exec"
+ "strings"
)
-// AppConfig config combines factory, pool and cmd configurations.
-type AppConfig struct {
- Command string
- User string
- Group string
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration.
- RelayTimeout time.Duration
+const ServiceName = "app"
+
+type Env map[string]string
+
+// AppFactory creates workers for the application.
+type AppFactory interface {
+ NewCmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
+// App manages worker
type App struct {
- cfg AppConfig
- configProvider config.Provider
+ cfg Config
+ factory roadrunner.Factory
}
-func (app *App) Init(provider config.Provider) error {
- app.cfg = AppConfig{}
- app.configProvider = provider
-
- err := app.configProvider.UnmarshalKey("app", &app.cfg)
+// Init application provider.
+func (app *App) Init(cfg config.Provider) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
}
+ app.cfg.InitDefaults()
- if app.cfg.Relay == "" {
- app.cfg.Relay = "pipes"
+ return nil
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
}
- return nil
+ return errCh
+}
+
+func (app *App) Stop() error {
+ if app.factory == nil {
+ return nil
+ }
+
+ return app.factory.Close(context.Background())
}
-func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
+func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
+
// create command according to the config
cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
@@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
}, nil
}
-// todo ENV unused
-func (app *App) NewFactory() (roadrunner.Factory, error) {
+func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+}
+
+func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(func(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
+ })
+
+ return p, nil
+}
+
+func (app *App) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
@@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go
deleted file mode 100644
index e13b267f..00000000
--- a/plugins/factory/app_provider.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package factory
-
-import (
- "os/exec"
-
- "github.com/spiral/roadrunner/v2"
-)
-
-type Env map[string]string
-
-type Spawner interface {
- // CmdFactory create new command factory with given env variables.
- NewCmd(env Env) (func() *exec.Cmd, error)
-
- // NewFactory inits new factory for workers.
- NewFactory() (roadrunner.Factory, error)
-}
diff --git a/plugins/factory/config.go b/plugins/factory/config.go
new file mode 100644
index 00000000..b2d1d0ad
--- /dev/null
+++ b/plugins/factory/config.go
@@ -0,0 +1,37 @@
+package factory
+
+import "time"
+
+// Config config combines factory, pool and cmd configurations.
+type Config struct {
+ // Command to run as application.
+ Command string
+
+ // User to run application under.
+ User string
+
+ // Group to run application under.
+ Group string
+
+ // Env represents application environment.
+ Env Env
+
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration
+}
+
+func (cfg *Config) InitDefaults() {
+ if cfg.Relay == "" {
+ cfg.Relay = "pipes"
+ }
+
+ if cfg.RelayTimeout == 0 {
+ cfg.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
deleted file mode 100644
index f7303b6d..00000000
--- a/plugins/factory/factory.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package factory
-
-import (
- "context"
-
- "log"
-
- "github.com/fatih/color"
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/events"
-)
-
-type WorkerFactory interface {
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error)
-}
-
-type WFactory struct {
- events *events.EventBroadcaster
- app Spawner
- wFactory roadrunner.Factory
-}
-
-func (wf *WFactory) Init(app Spawner) (err error) {
- wf.events = events.NewEventBroadcaster()
-
- wf.app = app
- wf.wFactory, err = app.NewFactory()
- if err != nil {
- return nil
- }
-
- return nil
-}
-
-func (wf *WFactory) AddListener(l events.EventListener) {
- wf.events.AddListener(l)
-}
-
-func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) {
- cmd, err := wf.app.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt)
- if err != nil {
- return nil, err
- }
-
- // TODO event to stop
- go func() {
- for e := range p.Events() {
- wf.events.Push(e)
- if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
- if we.Event == roadrunner.EventWorkerLog {
- log.Print(color.YellowString(string(we.Payload.([]byte))))
- }
- }
- }
- }()
-
- return p, nil
-}
-
-func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
- cmd, err := wf.app.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- return wf.wFactory.SpawnWorkerWithContext(ctx, cmd())
-}
diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php
deleted file mode 100644
index c6199449..00000000
--- a/plugins/factory/hello.php
+++ /dev/null
@@ -1 +0,0 @@
-<?php echo "hello -" . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 5ab6df73..f6b06dd0 100644
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spawner.NewCmd(nil)
+ cmd, err := f.spawner.CommandFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 2409627e..dbdb065b 100644
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -13,11 +13,11 @@ import (
type Foo2 struct {
configProvider config.Provider
- wf factory.WorkerFactory
+ wf factory.AppFactory
spw factory.Spawner
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error {
+func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error {
f.configProvider = p
f.wf = workerFactory
f.spw = spawner
@@ -27,14 +27,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spaw
func (f *Foo2) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spw.NewCmd(nil)
+ cmd, err := f.spw.CommandFactory(nil)
if err != nil {
errCh <- err
return errCh