diff options
author | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
commit | ba5c562f9038ba434e655fb82c44597fcccaff16 (patch) | |
tree | ff112b9dcffda63bc40094a57d0df61622368445 /plugins | |
parent | 3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff) |
- massive update in roadrunner 2.0 abstractions
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/config/provider.go | 2 | ||||
-rw-r--r-- | plugins/config/viper.go | 3 | ||||
-rw-r--r-- | plugins/events/broadcaster.go | 24 | ||||
-rw-r--r-- | plugins/factory/app.go | 114 | ||||
-rw-r--r-- | plugins/factory/app_provider.go | 17 | ||||
-rw-r--r-- | plugins/factory/config.go | 37 | ||||
-rw-r--r-- | plugins/factory/factory.go | 73 | ||||
-rw-r--r-- | plugins/factory/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 4 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_2.go | 8 | ||||
-rw-r--r-- | plugins/rpc/config.go | 3 | ||||
-rw-r--r-- | plugins/rpc/rpc.go | 36 |
12 files changed, 141 insertions, 181 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go index 580231fd..ac33b3de 100644 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -1,7 +1,7 @@ package config type Provider interface { - // Unmarshal configuration section into configuration object. + // UnmarshalKey reads configuration section into configuration object. // // func (h *HttpService) Init(cp config.Provider) error { // h.config := &HttpConfig{} diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0c34313c..4e85af6b 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,6 +14,7 @@ type ViperProvider struct { Prefix string } +// Inits config provider. func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error { return nil } -// +// UnmarshalKey reads configuration section into configuration object. func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { err := v.viper.UnmarshalKey(name, &out) if err != nil { diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go deleted file mode 100644 index 778b307d..00000000 --- a/plugins/events/broadcaster.go +++ /dev/null @@ -1,24 +0,0 @@ -package events - -type EventListener interface { - Handle(event interface{}) -} - -type EventBroadcaster struct { - listeners []EventListener -} - -func NewEventBroadcaster() *EventBroadcaster { - return &EventBroadcaster{} -} - -func (eb *EventBroadcaster) AddListener(l EventListener) { - // todo: threadcase - eb.listeners = append(eb.listeners, l) -} - -func (eb *EventBroadcaster) Push(e interface{}) { - for _, l := range eb.listeners { - l.Handle(e) - } -} diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 74d8d828..b6cdb3b3 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,70 @@ package factory import ( - "errors" + "context" "fmt" - "os" - "os/exec" - "strings" - "time" - + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" + "log" + "os" + "os/exec" + "strings" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" + return nil +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) } - return nil + return errCh +} + +func (app *App) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory() (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100644 index e13b267f..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory() (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100644 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100644 index f7303b6d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,73 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - events *events.EventBroadcaster - app Spawner - wFactory roadrunner.Factory -} - -func (wf *WFactory) Init(app Spawner) (err error) { - wf.events = events.NewEventBroadcaster() - - wf.app = app - wf.wFactory, err = app.NewFactory() - if err != nil { - return nil - } - - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.events.AddListener(l) -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.events.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - return wf.wFactory.SpawnWorkerWithContext(ctx, cmd()) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100644 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..f6b06dd0 100644 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.CommandFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..dbdb065b 100644 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,11 +13,11 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory + wf factory.AppFactory spw factory.Spawner } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error { f.configProvider = p f.wf = workerFactory f.spw = spawner @@ -27,14 +27,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spaw func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.spw.CommandFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index 1039ee5e..719fd5e3 100644 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,6 +12,9 @@ import ( type Config struct { // Listen string Listen string + + // Disabled disables RPC service. + Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index ef8e82e4..7b47682f 100644 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,8 +1,7 @@ package rpc import ( - "errors" - + "github.com/spiral/endure/errors" "github.com/spiral/goridge/v2" "github.com/spiral/roadrunner/v2/plugins/config" @@ -14,8 +13,8 @@ type RPCService interface { RPCService() (interface{}, error) } -// ID contains default service name. -const ID = "rpc" +// ServiceName contains default service name. +const ServiceName = "rpc" type services struct { service interface{} @@ -32,14 +31,19 @@ type Service struct { // Init rpc service. Must return true if service is enabled. func (s *Service) Init(cfg config.Provider) error { - err := cfg.UnmarshalKey(ID, &s.config) + if !cfg.Has(ServiceName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.config) if err != nil { return err } - s.config.InitDefaults() - // todo: handle disabled + if s.config.Disabled { + return errors.E(errors.Disabled) + } return s.config.Valid() } @@ -47,27 +51,15 @@ func (s *Service) Init(cfg config.Provider) error { // Serve serves the service. func (s *Service) Serve() chan error { s.close = make(chan struct{}) - errCh := make(chan error, 1) - server := rpc.NewServer() - if server == nil { - errCh <- errors.New("rpc server is nil") - return errCh - } - s.rpc = server - - if len(s.services) == 0 { - // todo: why this is an error? - errCh <- errors.New("no services with RPC") - return errCh - } + s.rpc = rpc.NewServer() // Attach all services for i := 0; i < len(s.services); i++ { err := s.Register(s.services[i].name, s.services[i].service) if err != nil { - errCh <- err + errCh <- errors.E(errors.Op("register service"), err) return errCh } } @@ -134,7 +126,7 @@ func (s *Service) RegisterService(p RPCService) error { // no suitable methods. It also logs the error using package log. func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { - return errors.New("RPC service is not configured") + return errors.E("RPC service is not configured") } return s.rpc.RegisterName(name, svc) |