diff options
author | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2020-10-25 15:55:51 +0300 |
commit | ba5c562f9038ba434e655fb82c44597fcccaff16 (patch) | |
tree | ff112b9dcffda63bc40094a57d0df61622368445 | |
parent | 3bdf7d02d83d1ff4726f3fbb01a45d016f39abec (diff) |
- massive update in roadrunner 2.0 abstractions
-rw-r--r-- | errors.go | 6 | ||||
-rw-r--r-- | errors_test.go | 2 | ||||
-rw-r--r-- | go.mod | 6 | ||||
-rw-r--r-- | go.sum | 1 | ||||
-rw-r--r-- | pipe_factory_test.go | 8 | ||||
-rw-r--r-- | plugins/config/provider.go | 2 | ||||
-rw-r--r-- | plugins/config/viper.go | 3 | ||||
-rw-r--r-- | plugins/events/broadcaster.go | 24 | ||||
-rw-r--r-- | plugins/factory/app.go | 114 | ||||
-rw-r--r-- | plugins/factory/app_provider.go | 17 | ||||
-rw-r--r-- | plugins/factory/config.go | 37 | ||||
-rw-r--r-- | plugins/factory/factory.go | 73 | ||||
-rw-r--r-- | plugins/factory/hello.php | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 4 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_2.go | 8 | ||||
-rw-r--r-- | plugins/rpc/config.go | 3 | ||||
-rw-r--r-- | plugins/rpc/rpc.go | 36 | ||||
-rw-r--r-- | pool.go | 96 | ||||
-rw-r--r-- | pool_supervisor.go | 182 | ||||
-rw-r--r-- | pool_watcher.go | 131 | ||||
-rw-r--r-- | socket_factory.go | 8 | ||||
-rw-r--r-- | socket_factory_test.go | 61 | ||||
-rw-r--r-- | static_pool.go | 210 | ||||
-rw-r--r-- | static_pool_test.go | 114 | ||||
-rw-r--r-- | sync_worker.go | 110 | ||||
-rw-r--r-- | sync_worker_test.go | 62 | ||||
-rw-r--r-- | util/events.go | 26 | ||||
-rw-r--r-- | worker.go | 99 | ||||
-rw-r--r-- | worker_test.go | 3 | ||||
-rw-r--r-- | worker_watcher.go (renamed from workers_watcher.go) | 158 |
30 files changed, 758 insertions, 847 deletions
@@ -1,11 +1,11 @@ package roadrunner -// TaskError is job level error (no WorkerProcess halt), wraps at top +// JobError is job level error (no WorkerProcess halt), wraps at top // of error context -type TaskError []byte +type JobError []byte // Error converts error context to string -func (te TaskError) Error() string { +func (te JobError) Error() string { return string(te) } diff --git a/errors_test.go b/errors_test.go index 69f1c9ec..75a86840 100644 --- a/errors_test.go +++ b/errors_test.go @@ -8,7 +8,7 @@ import ( ) func Test_JobError_Error(t *testing.T) { - e := TaskError([]byte("error")) + e := JobError([]byte("error")) assert.Equal(t, "error", e.Error()) } @@ -10,7 +10,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/shirou/gopsutil v2.20.9+incompatible github.com/spf13/viper v1.7.1 - github.com/spiral/endure v1.0.0-beta8 + github.com/spiral/endure v1.0.0-beta9 github.com/spiral/goridge/v2 v2.4.5 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -20,3 +20,7 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.2.5 // indirect ) + +replace ( + github.com/spiral/endure v1.0.0-beta9 => ./../endure +)
\ No newline at end of file @@ -186,6 +186,7 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spiral/endure v1.0.0-beta8 h1:bKVe7F8CbvDZt8UYX3WoSV49OpFPHiM9Py55i7USPK8= github.com/spiral/endure v1.0.0-beta8/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= +github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= github.com/spiral/goridge/v2 v2.4.5 h1:rg4lLEJLrEh1Wj6G1qTsYVbYiQvig6mOR1F9GyDIGm8= github.com/spiral/goridge/v2 v2.4.5/go.mod h1:C/EZKFPON9lypi8QO7I5ObgVmrIzTmhZqFz/tmypcGc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 95eededa..ee2510f3 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/plugins/config/provider.go b/plugins/config/provider.go index 580231fd..ac33b3de 100644 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -1,7 +1,7 @@ package config type Provider interface { - // Unmarshal configuration section into configuration object. + // UnmarshalKey reads configuration section into configuration object. // // func (h *HttpService) Init(cp config.Provider) error { // h.config := &HttpConfig{} diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0c34313c..4e85af6b 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,6 +14,7 @@ type ViperProvider struct { Prefix string } +// Inits config provider. func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error { return nil } -// +// UnmarshalKey reads configuration section into configuration object. func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { err := v.viper.UnmarshalKey(name, &out) if err != nil { diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go deleted file mode 100644 index 778b307d..00000000 --- a/plugins/events/broadcaster.go +++ /dev/null @@ -1,24 +0,0 @@ -package events - -type EventListener interface { - Handle(event interface{}) -} - -type EventBroadcaster struct { - listeners []EventListener -} - -func NewEventBroadcaster() *EventBroadcaster { - return &EventBroadcaster{} -} - -func (eb *EventBroadcaster) AddListener(l EventListener) { - // todo: threadcase - eb.listeners = append(eb.listeners, l) -} - -func (eb *EventBroadcaster) Push(e interface{}) { - for _, l := range eb.listeners { - l.Handle(e) - } -} diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 74d8d828..b6cdb3b3 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,70 @@ package factory import ( - "errors" + "context" "fmt" - "os" - "os/exec" - "strings" - "time" - + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" + "log" + "os" + "os/exec" + "strings" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" + return nil +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) } - return nil + return errCh +} + +func (app *App) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory() (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100644 index e13b267f..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory() (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100644 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100644 index f7303b6d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,73 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - events *events.EventBroadcaster - app Spawner - wFactory roadrunner.Factory -} - -func (wf *WFactory) Init(app Spawner) (err error) { - wf.events = events.NewEventBroadcaster() - - wf.app = app - wf.wFactory, err = app.NewFactory() - if err != nil { - return nil - } - - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.events.AddListener(l) -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.events.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - return wf.wFactory.SpawnWorkerWithContext(ctx, cmd()) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100644 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..f6b06dd0 100644 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.CommandFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..dbdb065b 100644 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,11 +13,11 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory + wf factory.AppFactory spw factory.Spawner } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory, spawner factory.Spawner) error { f.configProvider = p f.wf = workerFactory f.spw = spawner @@ -27,14 +27,14 @@ func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spaw func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.spw.CommandFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index 1039ee5e..719fd5e3 100644 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,6 +12,9 @@ import ( type Config struct { // Listen string Listen string + + // Disabled disables RPC service. + Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index ef8e82e4..7b47682f 100644 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,8 +1,7 @@ package rpc import ( - "errors" - + "github.com/spiral/endure/errors" "github.com/spiral/goridge/v2" "github.com/spiral/roadrunner/v2/plugins/config" @@ -14,8 +13,8 @@ type RPCService interface { RPCService() (interface{}, error) } -// ID contains default service name. -const ID = "rpc" +// ServiceName contains default service name. +const ServiceName = "rpc" type services struct { service interface{} @@ -32,14 +31,19 @@ type Service struct { // Init rpc service. Must return true if service is enabled. func (s *Service) Init(cfg config.Provider) error { - err := cfg.UnmarshalKey(ID, &s.config) + if !cfg.Has(ServiceName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.config) if err != nil { return err } - s.config.InitDefaults() - // todo: handle disabled + if s.config.Disabled { + return errors.E(errors.Disabled) + } return s.config.Valid() } @@ -47,27 +51,15 @@ func (s *Service) Init(cfg config.Provider) error { // Serve serves the service. func (s *Service) Serve() chan error { s.close = make(chan struct{}) - errCh := make(chan error, 1) - server := rpc.NewServer() - if server == nil { - errCh <- errors.New("rpc server is nil") - return errCh - } - s.rpc = server - - if len(s.services) == 0 { - // todo: why this is an error? - errCh <- errors.New("no services with RPC") - return errCh - } + s.rpc = rpc.NewServer() // Attach all services for i := 0; i < len(s.services); i++ { err := s.Register(s.services[i].name, s.services[i].service) if err != nil { - errCh <- err + errCh <- errors.E(errors.Op("register service"), err) return errCh } } @@ -134,7 +126,7 @@ func (s *Service) RegisterService(p RPCService) error { // no suitable methods. It also logs the error using package log. func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { - return errors.New("RPC service is not configured") + return errors.E("RPC service is not configured") } return s.rpc.RegisterName(name, svc) @@ -2,51 +2,53 @@ package roadrunner import ( "context" + "github.com/spiral/roadrunner/v2/util" "runtime" "time" ) +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event int64 + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} + const ( // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct = iota + 100 + EventWorkerConstruct = iota + 7800 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct - // EventWorkerKill thrown after worker is being forcefully killed. - EventWorkerKill - - // EventWorkerError thrown any worker related even happen (passed with WorkerError) - EventWorkerEvent - - // EventWorkerDead thrown when worker stops worker for any reason. - EventWorkerDead - - // EventPoolError caused on pool wide errors + // EventPoolError caused on pool wide errors. EventPoolError -) -const ( - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory = iota + 8000 + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError + + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory - // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError EventTTL - // EventIdleTTL triggered when worker spends too much time at rest. + // todo: EventIdleTTL triggered when worker spends too much time at rest. EventIdleTTL - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL ) // Pool managed set of inner worker processes. type Pool interface { - // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK - Events() chan PoolEvent + // AddListener connects event listener to the pool. + AddListener(listener util.EventListener) - // Exec one task with given payload and context, returns result or error. - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + // GetConfig returns pool configuration. + GetConfig() Config // Exec Exec(rqs Payload) (Payload, error) @@ -54,18 +56,14 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []WorkerBase) + // Remove worker from the pool. RemoveWorker(ctx context.Context, worker WorkerBase) error - Config() Config - // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) } -// todo: merge with pool options - -// Config defines basic behaviour of worker creation and handling process. -// +// Configures the pool behaviour. type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. @@ -84,20 +82,8 @@ type Config struct { // properly destroy, if timeout reached worker will be killed. Defaults to 60s. DestroyTimeout time.Duration - // TTL defines maximum time worker is allowed to live. - TTL int64 - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL int64 - - // ExecTTL defines maximum lifetime per job. - ExecTTL time.Duration - - // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes. - MaxPoolMemory uint64 - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + // Supervision config to limit worker and pool memory usage. + Supervisor SupervisorConfig } // InitDefaults enables default config values. @@ -113,4 +99,30 @@ func (cfg *Config) InitDefaults() { if cfg.DestroyTimeout == 0 { cfg.DestroyTimeout = time.Minute } + + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick time.Duration + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL int64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = time.Second + } } diff --git a/pool_supervisor.go b/pool_supervisor.go deleted file mode 100644 index c0a6ecd9..00000000 --- a/pool_supervisor.go +++ /dev/null @@ -1,182 +0,0 @@ -package roadrunner - -import ( - "context" - "errors" - "fmt" - "time" -) - -const MB = 1024 * 1024 - -type Supervisor interface { - Attach(pool Pool) - StartWatching() error - StopWatching() - Detach() -} - -type staticPoolSupervisor struct { - // maxWorkerMemory in MB - maxWorkerMemory uint64 - // maxPoolMemory in MB - maxPoolMemory uint64 - // maxWorkerTTL in seconds - maxWorkerTTL uint64 - // maxWorkerIdle in seconds - maxWorkerIdle uint64 - - // watchTimeout in seconds - watchTimeout uint64 - stopCh chan struct{} - - pool Pool -} - -/* -The arguments are: -maxWorkerMemory - maximum memory allowed for a single worker -maxPoolMemory - maximum pool memory allowed for a pool of a workers -maxTtl - maximum ttl for the worker after which it will be killed and replaced -maxIdle - maximum time to live for the worker in Ready state -watchTimeout - time between watching for the workers/pool status -*/ -// TODO might be just wrap the pool and return ControlledPool with included Pool interface -func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor { - if maxWorkerMemory == 0 { - // just set to a big number, 5GB - maxPoolMemory = 5000 * MB - } - - if watchTimeout == 0 { - watchTimeout = 60 - } - - return &staticPoolSupervisor{ - maxWorkerMemory: maxWorkerMemory, - maxPoolMemory: maxPoolMemory, - maxWorkerTTL: maxTtl, - maxWorkerIdle: maxIdle, - watchTimeout: watchTimeout, - stopCh: make(chan struct{}), - } -} - -func (sps *staticPoolSupervisor) Attach(pool Pool) { - sps.pool = pool -} - -func (sps *staticPoolSupervisor) StartWatching() error { - go func() { - watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout)) - for { - select { - case <-sps.stopCh: - watchTout.Stop() - return - // stop here - case <-watchTout.C: - err := sps.control() - if err != nil { - sps.pool.Events() <- PoolEvent{Payload: err} - } - } - } - }() - return nil -} - -func (sps *staticPoolSupervisor) StopWatching() { - sps.stopCh <- struct{}{} -} - -func (sps *staticPoolSupervisor) Detach() { - -} - -func (sps *staticPoolSupervisor) control() error { - if sps.pool == nil { - return errors.New("pool should be attached") - } - now := time.Now() - ctx := context.TODO() - - // THIS IS A COPY OF WORKERS - workers := sps.pool.Workers() - totalUsedMemory := uint64(0) - - for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == StateInvalid { - continue - } - - s, err := WorkerProcessState(workers[i]) - if err != nil { - err2 := sps.pool.RemoveWorker(ctx, workers[i]) - if err2 != nil { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)} - return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2) - } - sps.pool.Events() <- PoolEvent{Payload: err} - return err - } - - if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) { - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - - // after remove worker we should exclude it from further analysis - workers = append(workers[:i], workers[i+1:]...) - } - - if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB { - // TODO events - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - continue - } - - // firs we check maxWorker idle - if sps.maxWorkerIdle != 0 { - // then check for the worker state - if workers[i].State().Value() != StateReady { - continue - } - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - // get last used unix nano - lu := workers[i].State().LastUsed() - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() - // maxWorkerIdle more than diff between now and last used - if int64(sps.maxWorkerIdle)-res <= 0 { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - } - } - - // the very last step is to calculate pool memory usage (except excluded workers) - totalUsedMemory += s.MemoryUsage - } - - // if current usage more than max allowed pool memory usage - if totalUsedMemory > sps.maxPoolMemory { - sps.pool.Destroy(ctx) - } - - return nil -} diff --git a/pool_watcher.go b/pool_watcher.go new file mode 100644 index 00000000..6eb614dc --- /dev/null +++ b/pool_watcher.go @@ -0,0 +1,131 @@ +package roadrunner + +import ( + "context" + "github.com/spiral/roadrunner/v2/util" + "time" +) + +const MB = 1024 * 1024 + +type supervisedPool struct { + cfg SupervisorConfig + events *util.EventHandler + pool Pool + stopCh chan struct{} +} + +func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool { + return &supervisedPool{ + cfg: cfg, + events: events, + pool: pool, + stopCh: make(chan struct{}), + } +} + +func (sp *supervisedPool) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.control() + } + } + }() +} + +func (sp *supervisedPool) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervisedPool) control() { + now := time.Now() + ctx := context.TODO() + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == StateInvalid { + continue + } + + s, err := WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + // TODO events + //sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)} + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // get last used unix nano + lu := workers[i].State().LastUsed() + + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + + // maxWorkerIdle more than diff between now and last used + if sp.cfg.IdleTTL-res <= 0 { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) + } + } + } + + // the very last step is to calculate pool memory usage (except excluded workers) + //totalUsedMemory += s.MemoryUsage + } + + //// if current usage more than max allowed pool memory usage + //if totalUsedMemory > sp.maxPoolMemory { + // sp.pool.Destroy(ctx) + //} +} diff --git a/socket_factory.go b/socket_factory.go index 27558cce..0db7849b 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -2,6 +2,7 @@ package roadrunner import ( "context" + "github.com/shirou/gopsutil/process" "net" "os/exec" "strings" @@ -110,6 +111,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm w.Kill(context.Background()), w.Wait(context.Background()), ) + c <- socketSpawn{ w: nil, err: err, @@ -178,10 +180,16 @@ func (f *SocketFactory) Close(ctx context.Context) error { // waits for WorkerProcess to connect over socket and returns associated relay of timeout func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) { + ticker := time.NewTicker(time.Millisecond * 100) for { select { case <-ctx.Done(): return nil, ctx.Err() + case <-ticker.C: + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } default: tmp, ok := f.relays.Load(w.Pid()) if !ok { diff --git a/socket_factory_test.go b/socket_factory_test.go index cfb95ca1..6ab87872 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -98,28 +98,29 @@ func Test_Tcp_StartError(t *testing.T) { assert.Nil(t, w) } -// func Test_Tcp_Failboot(t *testing.T) { -// time.Sleep(time.Millisecond * 10) // to ensure free socket -// -// ls, err := net.Listen("tcp", "localhost:9007") -// if assert.NoError(t, err) { -// defer func() { -// err3 := ls.Close() -// if err3 != nil { -// t.Errorf("error closing the listener: error %v", err3) -// } -// }() -// } else { -// t.Skip("socket is busy") -// } -// -// cmd := exec.Command("php", "tests/failboot.php") -// -// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd) -// assert.Nil(t, w) -// assert.Error(t, err2) -// assert.Contains(t, err2.Error(), "failboot") -//} +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket @@ -161,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -208,7 +209,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -248,7 +249,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -304,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -393,7 +394,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -436,7 +437,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -512,7 +513,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -580,7 +581,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/static_pool.go b/static_pool.go index 0c2352ad..31923134 100644 --- a/static_pool.go +++ b/static_pool.go @@ -3,23 +3,21 @@ package roadrunner import ( "context" "fmt" + "github.com/spiral/roadrunner/v2/util" "os/exec" "sync" "github.com/pkg/errors" ) -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - // pool behaviour - cfg *Config + cfg Config // worker command creator cmd func() *exec.Cmd @@ -27,30 +25,31 @@ type StaticPool struct { // creates and connects to stack factory Factory + // distributes the events + events *util.EventHandler + // protects state of worker list, does not affect allocation muw sync.RWMutex - ww *WorkersWatcher + // manages worker states and TTLs + ww *workerWatcher - events chan PoolEvent -} -type PoolEvent struct { - Payload interface{} + // supervises memory and TTL of workers + sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -// TODO why cfg is passed by pointer? -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() p := &StaticPool{ cfg: cfg, cmd: cmd, factory: factory, - events: make(chan PoolEvent), + events: &util.EventHandler{}, } - p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { + p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { return nil, err @@ -74,12 +73,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co return nil, err } + // todo: implement + //p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) + //p.sp.Start() + return p, nil } +// AddListener connects event listener to the pool. +func (p *StaticPool) AddListener(listener util.EventListener) { + p.events.AddListener(listener) +} + // Config returns associated pool configuration. Immutable. -func (p *StaticPool) Config() Config { - return *p.cfg +func (p *StaticPool) GetConfig() Config { + return p.cfg } // Workers returns worker list associated with the pool. @@ -103,18 +111,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { rsp, err := sw.Exec(rqs) if err != nil { - errJ := p.checkMaxJobs(bCtx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) + if _, jobError := err.(JobError); jobError { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(bCtx) + if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + p.ww.PushWorker(w) + } + return EmptyPayload, err } sw.State().Set(StateInvalid) + p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) + if errS != nil { return EmptyPayload, fmt.Errorf("%v, %v", err, errS) } @@ -127,9 +147,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - return EmptyPayload, err + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.ExecWithContext(bCtx, rqs) + + return p.Exec(rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { @@ -146,81 +167,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { } // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - // todo: why TODO passed here? - getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) - defer cancel() - w, err := p.ww.GetFreeWorker(getWorkerCtx) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped - } else if err != nil { - return EmptyPayload, err - } - - sw := w.(SyncWorker) - - var execCtx context.Context - if p.cfg.ExecTTL != 0 { - var cancel2 context.CancelFunc - execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() - } else { - execCtx = ctx - } - - rsp, err := sw.ExecWithContext(execCtx, rqs) - if err != nil { - errJ := p.checkMaxJobs(ctx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } - // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - errS := w.Stop(ctx) - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) - } - - return EmptyPayload, err - } - - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(ctx) - if err != nil { - return EmptyPayload, err - } - return p.ExecWithContext(ctx, rqs) - } - - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(ctx) - if err != nil { - return EmptyPayload, err - } - } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() - } - return rsp, nil -} +//func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +// // todo: why TODO passed here? +// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) +// defer cancel() +// w, err := p.ww.GetFreeWorker(getWorkerCtx) +// if err != nil && errors.Is(err, ErrWatcherStopped) { +// return EmptyPayload, ErrWatcherStopped +// } else if err != nil { +// return EmptyPayload, err +// } +// +// sw := w.(SyncWorker) +// +// // todo: implement worker destroy +// //execCtx context.Context +// //if p.cfg.Supervisor.ExecTTL != 0 { +// // var cancel2 context.CancelFunc +// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) +// // defer cancel2() +// //} else { +// // execCtx = ctx +// //} +// +// rsp, err := sw.Exec(rqs) +// if err != nil { +// errJ := p.checkMaxJobs(ctx, w) +// if errJ != nil { +// // todo: worker was not destroyed +// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) +// } +// +// // soft job errors are allowed +// if _, jobError := err.(JobError); jobError { +// p.ww.PushWorker(w) +// return EmptyPayload, err +// } +// +// sw.State().Set(StateInvalid) +// errS := w.Stop(ctx) +// if errS != nil { +// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) +// } +// +// return EmptyPayload, err +// } +// +// // worker want's to be terminated +// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { +// w.State().Set(StateInvalid) +// err = w.Stop(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// return p.ExecWithContext(ctx, rqs) +// } +// +// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { +// err = p.ww.AllocateNew(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// } else { +// p.muw.Lock() +// p.ww.PushWorker(w) +// p.muw.Unlock() +// } +// +// return rsp, nil +//} // Destroy all underlying stack (but let them to complete the task). func (p *StaticPool) Destroy(ctx context.Context) { p.ww.Destroy(ctx) } -func (p *StaticPool) Events() chan PoolEvent { - return p.events -} - // allocate required number of stack func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase @@ -243,6 +264,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { err := p.ww.AllocateNew(ctx) if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } diff --git a/static_pool_test.go b/static_pool_test.go index ce9e6820..4a0c483a 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "log" "os/exec" "runtime" @@ -18,7 +17,6 @@ var cfg = Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, } func Test_NewPool(t *testing.T) { @@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) - assert.Equal(t, cfg, p.Config()) - defer p.Destroy(ctx) assert.NotNil(t, p) @@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), - &cfg, + cfg, ) assert.Nil(t, p) @@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -143,31 +139,31 @@ func Test_StaticPool_JobError(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } // TODO temporary commented, figure out later -// func Test_StaticPool_Broken_Replace(t *testing.T) { +//func Test_StaticPool_Broken_Replace(t *testing.T) { // ctx := context.Background() // p, err := NewPool( // ctx, // func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, // NewPipeFactory(), -// &cfg, +// cfg, // ) // assert.NoError(t, err) // assert.NotNil(t, p) @@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) { // var i int64 // atomic.StoreInt64(&i, 10) // +// p.AddListener(func(event interface{}) { +// +// }) +// // go func() { // for { // select { @@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Consume pool events wg := sync.WaitGroup{} wg.Add(1) - go func() { - for true { - select { - case ev := <-p.Events(): - fmt.Println(ev) - if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct { - wg.Done() - } + p.AddListener(func(event interface{}) { + if pe, ok := event.(PoolEvent); ok { + if pe.Event == EventWorkerConstruct { + wg.Done() } } - }() + }) // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill(ctx) @@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, - ExecTTL: time.Second * 4, }, ) assert.Error(t, err) @@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) assert.NoError(t, err) @@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 15, }, ) assert.NoError(t, err) @@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NotNil(t, p) - go func() { - for { - select { - case ev := <-p.Events(): - fmt.Println(ev) - } - } - }() - var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err := p.Exec(Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) assert.NoError(t, err) @@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(StateErrored) } - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) @@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) if err != nil { b.Fatal(err) @@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/sync_worker.go b/sync_worker.go index de9491d6..cbc2cc0b 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -3,6 +3,7 @@ package roadrunner import ( "context" "fmt" + "github.com/spiral/roadrunner/v2/util" "time" "github.com/pkg/errors" @@ -14,19 +15,17 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) - - // ExecWithContext allow to set ExecTTL - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } -type taskWorker struct { +type syncWorker struct { w WorkerBase } func NewSyncWorker(w WorkerBase) (SyncWorker, error) { - return &taskWorker{ + return &syncWorker{ w: w, }, nil } @@ -36,68 +35,9 @@ type twexec struct { err error } -func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - c := make(chan twexec) - go func() { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("payload can not be empty"), - } - return - } - - if tw.w.State().Value() != StateReady { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()), - } - return - } - - // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) - - rsp, err := tw.execPayload(rqs) - if err != nil { - if _, ok := err.(TaskError); !ok { - tw.w.State().Set(StateErrored) - tw.w.State().RegisterExec() - } - c <- twexec{ - payload: EmptyPayload, - err: err, - } - return - } - - tw.w.State().Set(StateReady) - tw.w.State().RegisterExec() - c <- twexec{ - payload: rsp, - err: nil, - } - return - }() - - for { - select { - case <-ctx.Done(): - return EmptyPayload, ctx.Err() - case res := <-c: - if res.err != nil { - return EmptyPayload, res.err - } - - return res.payload, nil - } - } -} - -// -func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { +// Exec payload without TTL timeout. +func (tw *syncWorker) Exec(p Payload) (Payload, error) { + if len(p.Body) == 0 && len(p.Context) == 0 { return EmptyPayload, fmt.Errorf("payload can not be empty") } @@ -109,9 +49,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.w.State().Set(StateWorking) - rsp, err := tw.execPayload(rqs) + rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(TaskError); !ok { + if _, ok := err.(JobError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -124,7 +64,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { +func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { return EmptyPayload, errors.Wrap(err, "header error") @@ -147,7 +87,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, TaskError(rsp.Context) + return EmptyPayload, JobError(rsp.Context) } // add streaming support :) @@ -158,46 +98,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) String() string { +func (tw *syncWorker) String() string { return tw.w.String() } -func (tw *taskWorker) Created() time.Time { - return tw.w.Created() +func (tw *syncWorker) Pid() int64 { + return tw.w.Pid() } -func (tw *taskWorker) Events() <-chan WorkerEvent { - return tw.w.Events() +func (tw *syncWorker) Created() time.Time { + return tw.w.Created() } -func (tw *taskWorker) Pid() int64 { - return tw.w.Pid() +func (tw *syncWorker) AddListener(listener util.EventListener) { + tw.w.AddListener(listener) } -func (tw *taskWorker) State() State { +func (tw *syncWorker) State() State { return tw.w.State() } -func (tw *taskWorker) Start() error { +func (tw *syncWorker) Start() error { return tw.w.Start() } -func (tw *taskWorker) Wait(ctx context.Context) error { +func (tw *syncWorker) Wait(ctx context.Context) error { return tw.w.Wait(ctx) } -func (tw *taskWorker) Stop(ctx context.Context) error { +func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *taskWorker) Kill(ctx context.Context) error { +func (tw *syncWorker) Kill(ctx context.Context) error { return tw.w.Kill(ctx) } -func (tw *taskWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() goridge.Relay { return tw.w.Relay() } -func (tw *taskWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl goridge.Relay) { tw.w.AttachRelay(rl) } diff --git a/sync_worker_test.go b/sync_worker_test.go index f4868009..ad1513d7 100644 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -2,13 +2,10 @@ package roadrunner import ( "context" - "errors" + "github.com/stretchr/testify/assert" "os/exec" "sync" "testing" - "time" - - "github.com/stretchr/testify/assert" ) func Test_Echo(t *testing.T) { @@ -34,7 +31,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -65,7 +62,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, EmptyPayload) + res, err := syncWorker.Exec(EmptyPayload) assert.Error(t, err) assert.Nil(t, res.Body) @@ -84,7 +81,6 @@ func Test_NotStarted_String(t *testing.T) { } func Test_NotStarted_Exec(t *testing.T) { - ctx := context.Background() cmd := exec.Command("php", "tests/client.php", "echo", "pipes") w, _ := InitBaseWorker(cmd) @@ -94,7 +90,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -143,7 +139,7 @@ func Test_Echo_Slow(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -164,28 +160,34 @@ func Test_Broken(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - go func() { - assert.NotNil(t, w) - tt := time.NewTimer(time.Second * 10) - defer wg.Done() - for { - select { - case ev := <-w.Events(): - assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") - return - case <-tt.C: - assert.Error(t, errors.New("no events from worker")) - return - } - } - }() + + w.AddListener(func(event interface{}) { + assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()") + wg.Done() + }) + + //go func() { + // assert.NotNil(t, w) + // tt := time.NewTimer(time.Second * 10) + // defer wg.Done() + // for { + // select { + // case ev := <-w.Events(): + // assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") + // return + // case <-tt.C: + // assert.Error(t, errors.New("no events from worker")) + // return + // } + // } + //}() syncWorker, err := NewSyncWorker(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -215,12 +217,12 @@ func Test_Error(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -244,19 +246,19 @@ func Test_NumExecs(t *testing.T) { t.Fatal(err) } - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/util/events.go b/util/events.go new file mode 100644 index 00000000..9e12c4f7 --- /dev/null +++ b/util/events.go @@ -0,0 +1,26 @@ +package util + +// Event listener listens for the events produced by worker, worker pool or other servce. +type EventListener func(event interface{}) + +// EventHandler helps to broadcast events to multiple listeners. +type EventHandler struct { + listeners []EventListener +} + +// NumListeners returns number of event listeners. +func (eb *EventHandler) NumListeners() int { + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *EventHandler) AddListener(listener EventListener) { + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *EventHandler) Push(e interface{}) { + for _, listener := range eb.listeners { + listener(e) + } +} @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/spiral/roadrunner/v2/util" "os" "os/exec" "strconv" @@ -15,6 +16,12 @@ import ( "go.uber.org/multierr" ) +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond +) + // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. @@ -22,38 +29,31 @@ const ( // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog - - // EventWorkerWaitDone triggered when worker exit from process Wait - EventWorkerWaitDone // todo: implemented? - - EventWorkerBufferClosed - - EventRelayCloseError - - EventWorkerProcessError ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 100 * time.Millisecond -) - -// todo: write comment +// WorkerEvent wraps worker events. type WorkerEvent struct { - Event int64 - Worker WorkerBase + // Event id, see below. + Event int64 + + // Worker triggered the event. + Worker WorkerBase + + // Event specific payload. Payload interface{} } type WorkerBase interface { fmt.Stringer - Created() time.Time + // Pid returns worker pid. + Pid() int64 - Events() <-chan WorkerEvent + // Created returns time worker was created at. + Created() time.Time - Pid() int64 + // AddListener attaches listener to consume worker events. + AddListener(listener util.EventListener) // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. @@ -88,7 +88,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events chan WorkerEvent + events *util.EventHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -129,7 +129,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { } w := &WorkerProcess{ created: time.Now(), - events: make(chan WorkerEvent, 10), + events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), } @@ -142,12 +142,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { return w, nil } +// Pid returns worker pid. +func (w *WorkerProcess) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. func (w *WorkerProcess) Created() time.Time { return w.created } -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) +// AddListener registers new worker event listener. +func (w *WorkerProcess) AddListener(listener util.EventListener) { + w.events.AddListener(listener) + + w.errBuffer.mu.Lock() + w.errBuffer.enable = true + w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -195,10 +206,6 @@ func (w *WorkerProcess) Start() error { return nil } -func (w *WorkerProcess) Events() <-chan WorkerEvent { - return w.events -} - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -208,15 +215,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error { w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer + + // if no errors in the events, error might be in the errBuffer if w.errBuffer.Len() > 0 { err = multierr.Append(err, errors.New(w.errBuffer.String())) } @@ -250,6 +250,7 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) + go func() { var err error w.errBuffer.Close() @@ -264,6 +265,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { w.state.Set(StateStopped) c <- nil }() + select { case <-ctx.Done(): return ctx.Err() @@ -290,16 +292,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error { } func (w *WorkerProcess) logCallback(log []byte) { - w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log} + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) } // thread safe errBuffer type errBuffer struct { - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo remove update + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo: remove update update chan interface{} stop chan interface{} logCallback func(log []byte) @@ -321,7 +324,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() - if len(eb.buf) > eb.last { + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.buf = eb.buf[0:0] eb.last = len(eb.buf) @@ -331,11 +334,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Stop() eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb == nil || eb.logCallback == nil { - eb.mu.Unlock() - return - } + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.last = len(eb.buf) } diff --git a/worker_test.go b/worker_test.go index a90b7ef2..d2744345 100644 --- a/worker_test.go +++ b/worker_test.go @@ -91,6 +91,7 @@ func TestErrBuffer_Write_Event(t *testing.T) { assert.Equal(t, []byte("hello\n"), log) wg.Done() } + buf.enable = true _, err := buf.Write([]byte("hello\n")) if err != nil { @@ -116,6 +117,8 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { assert.Equal(t, []byte("hello\nending"), log) wg.Done() } + buf.enable = true + _, err := buf.Write([]byte("hel")) if err != nil { t.Errorf("fail to write: error %v", err) diff --git a/workers_watcher.go b/worker_watcher.go index d9d27196..773f7745 100644 --- a/workers_watcher.go +++ b/worker_watcher.go @@ -3,6 +3,7 @@ package roadrunner import ( "context" "errors" + "github.com/spiral/roadrunner/v2/util" "sync" "time" ) @@ -59,36 +60,36 @@ func (stack *Stack) Pop() (WorkerBase, bool) { return w, false } -type WorkersWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator func(args ...interface{}) (WorkerBase, error) - initialNumWorkers int64 - actualNumWorkers int64 - events chan PoolEvent -} - type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(ctx context.Context, workers []WorkerBase) error + // GetFreeWorker provide first free worker GetFreeWorker(ctx context.Context) (WorkerBase, error) + // PutWorker enqueues worker back PushWorker(w WorkerBase) + // AllocateNew used to allocate new worker and put in into the WorkerWatcher AllocateNew(ctx context.Context) error + // Destroy destroys the underlying stack Destroy(ctx context.Context) + // WorkersList return all stack w/o removing it from internal storage WorkersList() []WorkerBase + // RemoveWorker remove worker from the stack RemoveWorker(ctx context.Context, wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { - // todo check if events not nil - ww := &WorkersWatcher{ +func newWorkerWatcher( + allocator func(args ...interface{}) (WorkerBase, error), + numWorkers int64, + events *util.EventHandler, +) *workerWatcher { + ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, initialNumWorkers: numWorkers, @@ -99,14 +100,23 @@ func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n return ww } -func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { +type workerWatcher struct { + mutex sync.RWMutex + stack *Stack + allocator func(args ...interface{}) (WorkerBase, error) + initialNumWorkers int64 + actualNumWorkers int64 + events *util.EventHandler +} + +func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { for i := 0; i < len(workers); i++ { sw, err := NewSyncWorker(workers[i]) if err != nil { return err } ww.stack.Push(sw) - ww.watch(sw) + sw.AddListener(ww.events.Push) go func(swc WorkerBase) { ww.wait(ctx, swc) @@ -115,12 +125,13 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) return nil } -func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { // thread safe operation w, stop := ww.stack.Pop() if stop { return nil, ErrWatcherStopped } + // handle worker remove state // in this state worker is destroyed by supervisor if w != nil && w.State().Value() == StateRemove { @@ -131,6 +142,7 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // try to get next return ww.GetFreeWorker(ctx) } + // no free stack if w == nil { tout := time.NewTicker(time.Second * 180) @@ -152,23 +164,31 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } } } + ww.decreaseNumOfActualWorkers() return w, nil } -func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { +func (ww *workerWatcher) AllocateNew(ctx context.Context) error { ww.stack.mutex.Lock() sw, err := ww.allocator() if err != nil { return err } + ww.addToWatch(sw) ww.stack.mutex.Unlock() ww.PushWorker(sw) + + ww.events.Push(PoolEvent{ + Event: EventWorkerConstruct, + Payload: sw, + }) + return nil } -func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { +func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { ww.stack.mutex.Lock() defer ww.stack.mutex.Unlock() pid := wb.Pid() @@ -193,19 +213,19 @@ func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error } // O(1) operation -func (ww *WorkersWatcher) PushWorker(w WorkerBase) { +func (ww *workerWatcher) PushWorker(w WorkerBase) { ww.mutex.Lock() ww.actualNumWorkers++ ww.mutex.Unlock() ww.stack.Push(w) } -func (ww *WorkersWatcher) ReduceWorkersCount() { +func (ww *workerWatcher) ReduceWorkersCount() { ww.decreaseNumOfActualWorkers() } // Destroy all underlying stack (but let them to complete the task) -func (ww *WorkersWatcher) Destroy(ctx context.Context) { +func (ww *workerWatcher) Destroy(ctx context.Context) { ww.stack.mutex.Lock() ww.stack.destroy = true ww.stack.mutex.Unlock() @@ -238,67 +258,63 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation -func (ww *WorkersWatcher) WorkersList() []WorkerBase { +func (ww *workerWatcher) WorkersList() []WorkerBase { return ww.stack.workers } -func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) { +func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { err := w.Wait(ctx) if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ + ww.events.Push(WorkerEvent{ Event: EventWorkerError, Worker: w, Payload: err, - }} + }) } - // If not destroyed, reallocate - if w.State().Value() != StateDestroyed { - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - ww.stack.mutex.Unlock() - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} - return - } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} - return + + if w.State().Value() == StateDestroyed { + // worker was manually destroyed, no need to replace + return + } + + pid := w.Pid() + ww.stack.mutex.Lock() + for i := 0; i < len(ww.stack.workers); i++ { + // worker in the stack, reallocating + if ww.stack.workers[i].Pid() == pid { + + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + ww.decreaseNumOfActualWorkers() + ww.stack.mutex.Unlock() + + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) } - } - ww.stack.mutex.Unlock() - // worker not in the stack (not returned), forget and allocate new - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} + return } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} } + + ww.stack.mutex.Unlock() + + // worker not in the stack (not returned), forget and allocate new + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) + return + } + return } -func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { +func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { @@ -306,18 +322,8 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { }() } -func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { +func (ww *workerWatcher) decreaseNumOfActualWorkers() { ww.mutex.Lock() ww.actualNumWorkers-- ww.mutex.Unlock() } - -func (ww *WorkersWatcher) watch(swc WorkerBase) { - // todo make event to stop function - go func() { - select { - case ev := <-swc.Events(): - ww.events <- PoolEvent{Payload: ev} - } - }() -} |