summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-26 12:01:53 +0300
committerGitHub <[email protected]>2020-10-26 12:01:53 +0300
commit91cf918b30938129609323ded53e190385e019a6 (patch)
tree0ad9537bd438c63719fb83343ab77fc4ab34eb83 /plugins
parent68bf13772c6ddfc5159c2a286e1a38e911614e72 (diff)
parent9aae9e2009bad07ebdee73e1c6cf56901d07880a (diff)
Merge pull request #373 from spiral/feature/new-worker-produces-active-worker
Feature/new worker produces active worker
Diffstat (limited to 'plugins')
-rwxr-xr-xplugins/config/provider.go2
-rwxr-xr-xplugins/config/tests/config_test.go2
-rwxr-xr-xplugins/config/viper.go3
-rwxr-xr-xplugins/events/broadcaster.go24
-rwxr-xr-xplugins/factory/app.go112
-rwxr-xr-xplugins/factory/app_provider.go17
-rwxr-xr-xplugins/factory/config.go37
-rwxr-xr-xplugins/factory/factory.go76
-rwxr-xr-xplugins/factory/hello.php1
-rwxr-xr-xplugins/factory/tests/factory_test.go7
-rwxr-xr-xplugins/factory/tests/plugin_1.go8
-rwxr-xr-xplugins/factory/tests/plugin_2.go24
-rwxr-xr-xplugins/rpc/config.go3
-rwxr-xr-xplugins/rpc/rpc.go82
-rwxr-xr-xplugins/rpc/rpc_test.go1
15 files changed, 181 insertions, 218 deletions
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
index 580231fd..ac33b3de 100755
--- a/plugins/config/provider.go
+++ b/plugins/config/provider.go
@@ -1,7 +1,7 @@
package config
type Provider interface {
- // Unmarshal configuration section into configuration object.
+ // UnmarshalKey reads configuration section into configuration object.
//
// func (h *HttpService) Init(cp config.Provider) error {
// h.config := &HttpConfig{}
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index c85a841f..14e60ac2 100755
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -48,7 +48,7 @@ func TestViperProvider_Init(t *testing.T) {
for {
select {
case e := <-errCh:
- assert.NoError(t, e.Error.Err)
+ assert.NoError(t, e.Error)
assert.NoError(t, container.Stop())
return
case <-c:
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index 0c34313c..4e85af6b 100755
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -14,6 +14,7 @@ type ViperProvider struct {
Prefix string
}
+// Inits config provider.
func (v *ViperProvider) Init() error {
v.viper = viper.New()
@@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error {
return nil
}
-//
+// UnmarshalKey reads configuration section into configuration object.
func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
err := v.viper.UnmarshalKey(name, &out)
if err != nil {
diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go
deleted file mode 100755
index 778b307d..00000000
--- a/plugins/events/broadcaster.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package events
-
-type EventListener interface {
- Handle(event interface{})
-}
-
-type EventBroadcaster struct {
- listeners []EventListener
-}
-
-func NewEventBroadcaster() *EventBroadcaster {
- return &EventBroadcaster{}
-}
-
-func (eb *EventBroadcaster) AddListener(l EventListener) {
- // todo: threadcase
- eb.listeners = append(eb.listeners, l)
-}
-
-func (eb *EventBroadcaster) Push(e interface{}) {
- for _, l := range eb.listeners {
- l.Handle(e)
- }
-}
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index e4002963..4951e3df 100755
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,58 +1,76 @@
package factory
import (
- "errors"
+ "context"
"fmt"
+ "log"
"os"
"os/exec"
"strings"
- "time"
+ "github.com/fatih/color"
+ "github.com/spiral/endure/errors"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
-// AppConfig config combines factory, pool and cmd configurations.
-type AppConfig struct {
- Command string
- User string
- Group string
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration.
- RelayTimeout time.Duration
+const ServiceName = "app"
+
+type Env map[string]string
+
+// AppFactory creates workers for the application.
+type AppFactory interface {
+ NewCmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
+// App manages worker
type App struct {
- cfg AppConfig
- configProvider config.Provider
+ cfg Config
+ factory roadrunner.Factory
}
-func (app *App) Init(provider config.Provider) error {
- app.cfg = AppConfig{}
- app.configProvider = provider
-
- err := app.configProvider.UnmarshalKey("app", &app.cfg)
+// Init application provider.
+func (app *App) Init(cfg config.Provider) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
}
+ app.cfg.InitDefaults()
+
+ return nil
+}
+
+// Name contains service name.
+func (app *App) Name() string {
+ return ServiceName
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
+ }
+
+ return errCh
+}
- if app.cfg.Relay == "" {
- app.cfg.Relay = "pipes"
+func (app *App) Stop() error {
+ if app.factory == nil {
+ return nil
}
- return nil
+ return app.factory.Close(context.Background())
}
-func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
+func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
+
// create command according to the config
cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
@@ -75,15 +93,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
}, nil
}
-// todo ENV unused
-func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
+func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+}
+
+func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(func(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
+ })
+
+ return p, nil
+}
+
+func (app *App) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
@@ -98,7 +146,7 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go
deleted file mode 100755
index 024c5bea..00000000
--- a/plugins/factory/app_provider.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package factory
-
-import (
- "os/exec"
-
- "github.com/spiral/roadrunner/v2"
-)
-
-type Env map[string]string
-
-type Spawner interface {
- // CmdFactory create new command factory with given env variables.
- NewCmd(env Env) (func() *exec.Cmd, error)
-
- // NewFactory inits new factory for workers.
- NewFactory(env Env) (roadrunner.Factory, error)
-}
diff --git a/plugins/factory/config.go b/plugins/factory/config.go
new file mode 100755
index 00000000..b2d1d0ad
--- /dev/null
+++ b/plugins/factory/config.go
@@ -0,0 +1,37 @@
+package factory
+
+import "time"
+
+// Config config combines factory, pool and cmd configurations.
+type Config struct {
+ // Command to run as application.
+ Command string
+
+ // User to run application under.
+ User string
+
+ // Group to run application under.
+ Group string
+
+ // Env represents application environment.
+ Env Env
+
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration
+}
+
+func (cfg *Config) InitDefaults() {
+ if cfg.Relay == "" {
+ cfg.Relay = "pipes"
+ }
+
+ if cfg.RelayTimeout == 0 {
+ cfg.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
deleted file mode 100755
index 5d80682d..00000000
--- a/plugins/factory/factory.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package factory
-
-import (
- "context"
-
- "log"
-
- "github.com/fatih/color"
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/events"
-)
-
-type WorkerFactory interface {
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error)
-}
-
-type WFactory struct {
- spw Spawner
- eb *events.EventBroadcaster
-}
-
-func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) {
- cmd, err := wf.spw.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- factory, err := wf.spw.NewFactory(env)
- if err != nil {
- return nil, err
- }
-
- p, err := roadrunner.NewPool(ctx, cmd, factory, opt)
- if err != nil {
- return nil, err
- }
-
- // TODO event to stop
- go func() {
- for e := range p.Events() {
- wf.eb.Push(e)
- if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
- if we.Event == roadrunner.EventWorkerLog {
- log.Print(color.YellowString(string(we.Payload.([]byte))))
- }
- }
- }
- }()
-
- return p, nil
-}
-
-func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
- cmd, err := wf.spw.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- wb, err := roadrunner.InitBaseWorker(cmd())
- if err != nil {
- return nil, err
- }
-
- return wb, nil
-}
-
-func (wf *WFactory) Init(app Spawner) error {
- wf.spw = app
- wf.eb = events.NewEventBroadcaster()
- return nil
-}
-
-func (wf *WFactory) AddListener(l events.EventListener) {
- wf.eb.AddListener(l)
-}
diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php
deleted file mode 100755
index c6199449..00000000
--- a/plugins/factory/hello.php
+++ /dev/null
@@ -1 +0,0 @@
-<?php echo "hello -" . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 5347083a..6c264fd6 100755
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -31,11 +31,6 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&factory.WFactory{})
- if err != nil {
- t.Fatal(err)
- }
-
err = container.Register(&Foo{})
if err != nil {
t.Fatal(err)
@@ -65,7 +60,7 @@ func TestFactory(t *testing.T) {
for {
select {
case e := <-errCh:
- assert.NoError(t, e.Error.Err)
+ assert.NoError(t, e.Error)
assert.NoError(t, container.Stop())
return
case <-c:
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 5ab6df73..9011bb00 100755
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -10,10 +10,10 @@ import (
type Foo struct {
configProvider config.Provider
- spawner factory.Spawner
+ spawner factory.AppFactory
}
-func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
+func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error {
f.configProvider = p
f.spawner = spw
return nil
@@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spawner.NewCmd(nil)
+ cmd, err := f.spawner.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 2409627e..9f401bec 100755
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -13,28 +13,26 @@ import (
type Foo2 struct {
configProvider config.Provider
- wf factory.WorkerFactory
- spw factory.Spawner
+ wf factory.AppFactory
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error {
+func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error {
f.configProvider = p
f.wf = workerFactory
- f.spw = spawner
return nil
}
func (f *Foo2) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spw.NewCmd(nil)
+ cmd, err := f.wf.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
@@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error {
_ = w
- poolConfig := &roadrunner.Config{
+ poolConfig := roadrunner.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- TTL: 1000,
- IdleTTL: 1000,
- ExecTTL: time.Second * 10,
- MaxPoolMemory: 10000,
- MaxWorkerMemory: 10000,
+ Supervisor: roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: time.Second * 10,
+ MaxWorkerMemory: 1000,
+ },
}
pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
index 1039ee5e..719fd5e3 100755
--- a/plugins/rpc/config.go
+++ b/plugins/rpc/config.go
@@ -12,6 +12,9 @@ import (
type Config struct {
// Listen string
Listen string
+
+ // Disabled disables RPC service.
+ Disabled bool
}
// InitDefaults allows to init blank config with pre-defined set of default values.
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index 6568eea3..0f6c9753 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,21 +1,24 @@
package rpc
import (
- "errors"
+ "net/rpc"
+ "github.com/spiral/endure"
+ "github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
-
- "net/rpc"
)
-type PluginRpc interface {
- Name() string
- RpcService() (interface{}, error)
+// RPCPluggable declares the ability to create set of public RPC methods.
+type RPCPluggable interface {
+ endure.Named
+
+ // Provides RPC methods for the given service.
+ RPCService() (interface{}, error)
}
-// ID contains default service name.
-const ID = "rpc"
+// ServiceName contains default service name.
+const ServiceName = "rpc"
type services struct {
service interface{}
@@ -24,52 +27,48 @@ type services struct {
// Service is RPC service.
type Service struct {
- // TODO do we need a pointer here since all receivers are pointers??
- rpc *rpc.Server
- configProvider config.Provider
- services []services
- config Config
- close chan struct{}
+ rpc *rpc.Server
+ services []services
+ config Config
+ close chan struct{}
}
// Init rpc service. Must return true if service is enabled.
func (s *Service) Init(cfg config.Provider) error {
- s.configProvider = cfg
- err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if !cfg.Has(ServiceName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(ServiceName, &s.config)
if err != nil {
return err
}
+ s.config.InitDefaults()
- // TODO Do we need to init defaults
- if s.config.Listen == "" {
- s.config.InitDefaults()
+ if s.config.Disabled {
+ return errors.E(errors.Disabled)
}
- s.close = make(chan struct{})
+ return s.config.Valid()
+}
- return nil
+// Name contains service name.
+func (s *Service) Name() string {
+ return ServiceName
}
// Serve serves the service.
func (s *Service) Serve() chan error {
+ s.close = make(chan struct{}, 1)
errCh := make(chan error, 1)
- server := rpc.NewServer()
- if server == nil {
- errCh <- errors.New("rpc server is nil")
- return errCh
- }
- s.rpc = server
- if len(s.services) == 0 {
- errCh <- errors.New("no services with RPC")
- return errCh
- }
+ s.rpc = rpc.NewServer()
// Attach all services
for i := 0; i < len(s.services); i++ {
err := s.Register(s.services[i].name, s.services[i].service)
if err != nil {
- errCh <- err
+ errCh <- errors.E(errors.Op("register service"), err)
return errCh
}
}
@@ -85,7 +84,10 @@ func (s *Service) Serve() chan error {
select {
case <-s.close:
// log error
- errCh <- ln.Close()
+ err := ln.Close()
+ if err != nil {
+ errCh <- errors.E(errors.Op("close RPC socket"), err)
+ }
return
default:
conn, err := ln.Accept()
@@ -98,7 +100,7 @@ func (s *Service) Serve() chan error {
}
}()
- return nil
+ return errCh
}
// Stop stops the service.
@@ -109,12 +111,12 @@ func (s *Service) Stop() error {
func (s *Service) Depends() []interface{} {
return []interface{}{
- s.RpcService,
+ s.RegisterService,
}
}
-func (s *Service) RpcService(p PluginRpc) error {
- service, err := p.RpcService()
+func (s *Service) RegisterService(p RPCPluggable) error {
+ service, err := p.RPCService()
if err != nil {
return err
}
@@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error {
// no suitable methods. It also logs the error using package log.
func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
- return errors.New("RPC service is not configured")
+ return errors.E("RPC service is not configured")
}
return s.rpc.RegisterName(name, svc)
@@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error {
// Client creates new RPC client.
func (s *Service) Client() (*rpc.Client, error) {
- if s.configProvider == nil {
- return nil, errors.New("RPC service is not configured")
- }
-
conn, err := s.config.Dialer()
if err != nil {
return nil, err
diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go
deleted file mode 100755
index 9ab1e3e8..00000000
--- a/plugins/rpc/rpc_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package rpc