diff options
author | Valery Piashchynski <[email protected]> | 2020-10-26 12:01:53 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-10-26 12:01:53 +0300 |
commit | 91cf918b30938129609323ded53e190385e019a6 (patch) | |
tree | 0ad9537bd438c63719fb83343ab77fc4ab34eb83 /plugins | |
parent | 68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff) | |
parent | 9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff) |
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'plugins')
-rwxr-xr-x | plugins/config/provider.go | 2 | ||||
-rwxr-xr-x | plugins/config/tests/config_test.go | 2 | ||||
-rwxr-xr-x | plugins/config/viper.go | 3 | ||||
-rwxr-xr-x | plugins/events/broadcaster.go | 24 | ||||
-rwxr-xr-x | plugins/factory/app.go | 112 | ||||
-rwxr-xr-x | plugins/factory/app_provider.go | 17 | ||||
-rwxr-xr-x | plugins/factory/config.go | 37 | ||||
-rwxr-xr-x | plugins/factory/factory.go | 76 | ||||
-rwxr-xr-x | plugins/factory/hello.php | 1 | ||||
-rwxr-xr-x | plugins/factory/tests/factory_test.go | 7 | ||||
-rwxr-xr-x | plugins/factory/tests/plugin_1.go | 8 | ||||
-rwxr-xr-x | plugins/factory/tests/plugin_2.go | 24 | ||||
-rwxr-xr-x | plugins/rpc/config.go | 3 | ||||
-rwxr-xr-x | plugins/rpc/rpc.go | 82 | ||||
-rwxr-xr-x | plugins/rpc/rpc_test.go | 1 |
15 files changed, 181 insertions, 218 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go index 580231fd..ac33b3de 100755 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -1,7 +1,7 @@ package config type Provider interface { - // Unmarshal configuration section into configuration object. + // UnmarshalKey reads configuration section into configuration object. // // func (h *HttpService) Init(cp config.Provider) error { // h.config := &HttpConfig{} diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go index c85a841f..14e60ac2 100755 --- a/plugins/config/tests/config_test.go +++ b/plugins/config/tests/config_test.go @@ -48,7 +48,7 @@ func TestViperProvider_Init(t *testing.T) { for { select { case e := <-errCh: - assert.NoError(t, e.Error.Err) + assert.NoError(t, e.Error) assert.NoError(t, container.Stop()) return case <-c: diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0c34313c..4e85af6b 100755 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,6 +14,7 @@ type ViperProvider struct { Prefix string } +// Inits config provider. func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error { return nil } -// +// UnmarshalKey reads configuration section into configuration object. func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { err := v.viper.UnmarshalKey(name, &out) if err != nil { diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go deleted file mode 100755 index 778b307d..00000000 --- a/plugins/events/broadcaster.go +++ /dev/null @@ -1,24 +0,0 @@ -package events - -type EventListener interface { - Handle(event interface{}) -} - -type EventBroadcaster struct { - listeners []EventListener -} - -func NewEventBroadcaster() *EventBroadcaster { - return &EventBroadcaster{} -} - -func (eb *EventBroadcaster) AddListener(l EventListener) { - // todo: threadcase - eb.listeners = append(eb.listeners, l) -} - -func (eb *EventBroadcaster) Push(e interface{}) { - for _, l := range eb.listeners { - l.Handle(e) - } -} diff --git a/plugins/factory/app.go b/plugins/factory/app.go index e4002963..4951e3df 100755 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,76 @@ package factory import ( - "errors" + "context" "fmt" + "log" "os" "os/exec" "strings" - "time" + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() + + return nil +} + +// Name contains service name. +func (app *App) Name() string { + return ServiceName +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) + } + + return errCh +} - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" +func (app *App) Stop() error { + if app.factory == nil { + return nil } - return nil + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +93,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +146,7 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100755 index 024c5bea..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory(env Env) (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100755 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100755 index 5d80682d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,76 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - spw Spawner - eb *events.EventBroadcaster -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - factory, err := wf.spw.NewFactory(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, factory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.eb.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - wb, err := roadrunner.InitBaseWorker(cmd()) - if err != nil { - return nil, err - } - - return wb, nil -} - -func (wf *WFactory) Init(app Spawner) error { - wf.spw = app - wf.eb = events.NewEventBroadcaster() - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.eb.AddListener(l) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100755 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 5347083a..6c264fd6 100755 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -31,11 +31,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Register(&factory.WFactory{}) - if err != nil { - t.Fatal(err) - } - err = container.Register(&Foo{}) if err != nil { t.Fatal(err) @@ -65,7 +60,7 @@ func TestFactory(t *testing.T) { for { select { case e := <-errCh: - assert.NoError(t, e.Error.Err) + assert.NoError(t, e.Error) assert.NoError(t, container.Stop()) return case <-c: diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..9011bb00 100755 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -10,10 +10,10 @@ import ( type Foo struct { configProvider config.Provider - spawner factory.Spawner + spawner factory.AppFactory } -func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { +func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error { f.configProvider = p f.spawner = spw return nil @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.NewCmdFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..9f401bec 100755 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,28 +13,26 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory - spw factory.Spawner + wf factory.AppFactory } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error { f.configProvider = p f.wf = workerFactory - f.spw = spawner return nil } func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.wf.NewCmdFactory(nil) if err != nil { errCh <- err return errCh @@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error { _ = w - poolConfig := &roadrunner.Config{ + poolConfig := roadrunner.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - TTL: 1000, - IdleTTL: 1000, - ExecTTL: time.Second * 10, - MaxPoolMemory: 10000, - MaxWorkerMemory: 10000, + Supervisor: roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: time.Second * 10, + MaxWorkerMemory: 1000, + }, } pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index 1039ee5e..719fd5e3 100755 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,6 +12,9 @@ import ( type Config struct { // Listen string Listen string + + // Disabled disables RPC service. + Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index 6568eea3..0f6c9753 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,21 +1,24 @@ package rpc import ( - "errors" + "net/rpc" + "github.com/spiral/endure" + "github.com/spiral/endure/errors" "github.com/spiral/goridge/v2" "github.com/spiral/roadrunner/v2/plugins/config" - - "net/rpc" ) -type PluginRpc interface { - Name() string - RpcService() (interface{}, error) +// RPCPluggable declares the ability to create set of public RPC methods. +type RPCPluggable interface { + endure.Named + + // Provides RPC methods for the given service. + RPCService() (interface{}, error) } -// ID contains default service name. -const ID = "rpc" +// ServiceName contains default service name. +const ServiceName = "rpc" type services struct { service interface{} @@ -24,52 +27,48 @@ type services struct { // Service is RPC service. type Service struct { - // TODO do we need a pointer here since all receivers are pointers?? - rpc *rpc.Server - configProvider config.Provider - services []services - config Config - close chan struct{} + rpc *rpc.Server + services []services + config Config + close chan struct{} } // Init rpc service. Must return true if service is enabled. func (s *Service) Init(cfg config.Provider) error { - s.configProvider = cfg - err := s.configProvider.UnmarshalKey(ID, &s.config) + if !cfg.Has(ServiceName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.config) if err != nil { return err } + s.config.InitDefaults() - // TODO Do we need to init defaults - if s.config.Listen == "" { - s.config.InitDefaults() + if s.config.Disabled { + return errors.E(errors.Disabled) } - s.close = make(chan struct{}) + return s.config.Valid() +} - return nil +// Name contains service name. +func (s *Service) Name() string { + return ServiceName } // Serve serves the service. func (s *Service) Serve() chan error { + s.close = make(chan struct{}, 1) errCh := make(chan error, 1) - server := rpc.NewServer() - if server == nil { - errCh <- errors.New("rpc server is nil") - return errCh - } - s.rpc = server - if len(s.services) == 0 { - errCh <- errors.New("no services with RPC") - return errCh - } + s.rpc = rpc.NewServer() // Attach all services for i := 0; i < len(s.services); i++ { err := s.Register(s.services[i].name, s.services[i].service) if err != nil { - errCh <- err + errCh <- errors.E(errors.Op("register service"), err) return errCh } } @@ -85,7 +84,10 @@ func (s *Service) Serve() chan error { select { case <-s.close: // log error - errCh <- ln.Close() + err := ln.Close() + if err != nil { + errCh <- errors.E(errors.Op("close RPC socket"), err) + } return default: conn, err := ln.Accept() @@ -98,7 +100,7 @@ func (s *Service) Serve() chan error { } }() - return nil + return errCh } // Stop stops the service. @@ -109,12 +111,12 @@ func (s *Service) Stop() error { func (s *Service) Depends() []interface{} { return []interface{}{ - s.RpcService, + s.RegisterService, } } -func (s *Service) RpcService(p PluginRpc) error { - service, err := p.RpcService() +func (s *Service) RegisterService(p RPCPluggable) error { + service, err := p.RPCService() if err != nil { return err } @@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error { // no suitable methods. It also logs the error using package log. func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { - return errors.New("RPC service is not configured") + return errors.E("RPC service is not configured") } return s.rpc.RegisterName(name, svc) @@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error { // Client creates new RPC client. func (s *Service) Client() (*rpc.Client, error) { - if s.configProvider == nil { - return nil, errors.New("RPC service is not configured") - } - conn, err := s.config.Dialer() if err != nil { return nil, err diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go deleted file mode 100755 index 9ab1e3e8..00000000 --- a/plugins/rpc/rpc_test.go +++ /dev/null @@ -1 +0,0 @@ -package rpc |