From ba5c562f9038ba434e655fb82c44597fcccaff16 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sun, 25 Oct 2020 15:55:51 +0300 Subject: - massive update in roadrunner 2.0 abstractions --- errors.go | 6 +- errors_test.go | 2 +- go.mod | 6 +- go.sum | 1 + pipe_factory_test.go | 8 +- plugins/config/provider.go | 2 +- plugins/config/viper.go | 3 +- plugins/events/broadcaster.go | 24 --- plugins/factory/app.go | 114 ++++++++----- plugins/factory/app_provider.go | 17 -- plugins/factory/config.go | 37 +++++ plugins/factory/factory.go | 73 --------- plugins/factory/hello.php | 1 - plugins/factory/tests/plugin_1.go | 4 +- plugins/factory/tests/plugin_2.go | 8 +- plugins/rpc/config.go | 3 + plugins/rpc/rpc.go | 36 ++--- pool.go | 96 ++++++----- pool_supervisor.go | 182 --------------------- pool_watcher.go | 131 +++++++++++++++ socket_factory.go | 8 + socket_factory_test.go | 61 +++---- static_pool.go | 210 +++++++++++++----------- static_pool_test.go | 114 ++++++------- sync_worker.go | 110 +++---------- sync_worker_test.go | 62 +++---- util/events.go | 26 +++ worker.go | 99 ++++++------ worker_test.go | 3 + worker_watcher.go | 329 ++++++++++++++++++++++++++++++++++++++ workers_watcher.go | 323 ------------------------------------- 31 files changed, 1005 insertions(+), 1094 deletions(-) delete mode 100644 plugins/events/broadcaster.go delete mode 100644 plugins/factory/app_provider.go create mode 100644 plugins/factory/config.go delete mode 100644 plugins/factory/factory.go delete mode 100644 plugins/factory/hello.php delete mode 100644 pool_supervisor.go create mode 100644 pool_watcher.go create mode 100644 util/events.go create mode 100644 worker_watcher.go delete mode 100644 workers_watcher.go diff --git a/errors.go b/errors.go index b9746702..52356549 100644 --- a/errors.go +++ b/errors.go @@ -1,11 +1,11 @@ package roadrunner -// TaskError is job level error (no WorkerProcess halt), wraps at top +// JobError is job level error (no WorkerProcess halt), wraps at top // of error context -type TaskError []byte +type JobError []byte // Error converts error context to string -func (te TaskError) Error() string { +func (te JobError) Error() string { return string(te) } diff --git a/errors_test.go b/errors_test.go index 69f1c9ec..75a86840 100644 --- a/errors_test.go +++ b/errors_test.go @@ -8,7 +8,7 @@ import ( ) func Test_JobError_Error(t *testing.T) { - e := TaskError([]byte("error")) + e := JobError([]byte("error")) assert.Equal(t, "error", e.Error()) } diff --git a/go.mod b/go.mod index ddf0fe98..322a7022 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/shirou/gopsutil v2.20.9+incompatible github.com/spf13/viper v1.7.1 - github.com/spiral/endure v1.0.0-beta8 + github.com/spiral/endure v1.0.0-beta9 github.com/spiral/goridge/v2 v2.4.5 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -20,3 +20,7 @@ require ( gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.2.5 // indirect ) + +replace ( + github.com/spiral/endure v1.0.0-beta9 => ./../endure +) \ No newline at end of file diff --git a/go.sum b/go.sum index 72ca37a7..85dbeb85 100644 --- a/go.sum +++ b/go.sum @@ -186,6 +186,7 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spiral/endure v1.0.0-beta8 h1:bKVe7F8CbvDZt8UYX3WoSV49OpFPHiM9Py55i7USPK8= github.com/spiral/endure v1.0.0-beta8/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= +github.com/spiral/endure v1.0.0-beta9/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= github.com/spiral/goridge/v2 v2.4.5 h1:rg4lLEJLrEh1Wj6G1qTsYVbYiQvig6mOR1F9GyDIGm8= github.com/spiral/goridge/v2 v2.4.5/go.mod h1:C/EZKFPON9lypi8QO7I5ObgVmrIzTmhZqFz/tmypcGc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 95eededa..ee2510f3 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/plugins/config/provider.go b/plugins/config/provider.go index 580231fd..ac33b3de 100644 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -1,7 +1,7 @@ package config type Provider interface { - // Unmarshal configuration section into configuration object. + // UnmarshalKey reads configuration section into configuration object. // // func (h *HttpService) Init(cp config.Provider) error { // h.config := &HttpConfig{} diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0c34313c..4e85af6b 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,6 +14,7 @@ type ViperProvider struct { Prefix string } +// Inits config provider. func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error { return nil } -// +// UnmarshalKey reads configuration section into configuration object. func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { err := v.viper.UnmarshalKey(name, &out) if err != nil { diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go deleted file mode 100644 index 778b307d..00000000 --- a/plugins/events/broadcaster.go +++ /dev/null @@ -1,24 +0,0 @@ -package events - -type EventListener interface { - Handle(event interface{}) -} - -type EventBroadcaster struct { - listeners []EventListener -} - -func NewEventBroadcaster() *EventBroadcaster { - return &EventBroadcaster{} -} - -func (eb *EventBroadcaster) AddListener(l EventListener) { - // todo: threadcase - eb.listeners = append(eb.listeners, l) -} - -func (eb *EventBroadcaster) Push(e interface{}) { - for _, l := range eb.listeners { - l.Handle(e) - } -} diff --git a/plugins/factory/app.go b/plugins/factory/app.go index 74d8d828..b6cdb3b3 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,70 @@ package factory import ( - "errors" + "context" "fmt" - "os" - "os/exec" - "strings" - "time" - + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" + "log" + "os" + "os/exec" + "strings" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" + return nil +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) } - return nil + return errCh +} + +func (app *App) Stop() error { + if app.factory == nil { + return nil + } + + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +87,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory() (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +140,7 @@ func (app *App) NewFactory() (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100644 index e13b267f..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory() (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100644 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100644 index f7303b6d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,73 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - events *events.EventBroadcaster - app Spawner - wFactory roadrunner.Factory -} - -func (wf *WFactory) Init(app Spawner) (err error) { - wf.events = events.NewEventBroadcaster() - - wf.app = app - wf.wFactory, err = app.NewFactory() - if err != nil { - return nil - } - - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.events.AddListener(l) -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, wf.wFactory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.events.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.app.NewCmd(env) - if err != nil { - return nil, err - } - - return wf.wFactory.SpawnWorkerWithContext(ctx, cmd()) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100644 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -= float64(sps.maxWorkerTTL) { - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - - // after remove worker we should exclude it from further analysis - workers = append(workers[:i], workers[i+1:]...) - } - - if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB { - // TODO events - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - continue - } - - // firs we check maxWorker idle - if sps.maxWorkerIdle != 0 { - // then check for the worker state - if workers[i].State().Value() != StateReady { - continue - } - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - // get last used unix nano - lu := workers[i].State().LastUsed() - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() - // maxWorkerIdle more than diff between now and last used - if int64(sps.maxWorkerIdle)-res <= 0 { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - } - } - - // the very last step is to calculate pool memory usage (except excluded workers) - totalUsedMemory += s.MemoryUsage - } - - // if current usage more than max allowed pool memory usage - if totalUsedMemory > sps.maxPoolMemory { - sps.pool.Destroy(ctx) - } - - return nil -} diff --git a/pool_watcher.go b/pool_watcher.go new file mode 100644 index 00000000..6eb614dc --- /dev/null +++ b/pool_watcher.go @@ -0,0 +1,131 @@ +package roadrunner + +import ( + "context" + "github.com/spiral/roadrunner/v2/util" + "time" +) + +const MB = 1024 * 1024 + +type supervisedPool struct { + cfg SupervisorConfig + events *util.EventHandler + pool Pool + stopCh chan struct{} +} + +func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool { + return &supervisedPool{ + cfg: cfg, + events: events, + pool: pool, + stopCh: make(chan struct{}), + } +} + +func (sp *supervisedPool) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.control() + } + } + }() +} + +func (sp *supervisedPool) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervisedPool) control() { + now := time.Now() + ctx := context.TODO() + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == StateInvalid { + continue + } + + s, err := WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + // TODO events + //sp.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sp.maxWorkerMemory)} + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // get last used unix nano + lu := workers[i].State().LastUsed() + + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + + // maxWorkerIdle more than diff between now and last used + if sp.cfg.IdleTTL-res <= 0 { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) + } + } + } + + // the very last step is to calculate pool memory usage (except excluded workers) + //totalUsedMemory += s.MemoryUsage + } + + //// if current usage more than max allowed pool memory usage + //if totalUsedMemory > sp.maxPoolMemory { + // sp.pool.Destroy(ctx) + //} +} diff --git a/socket_factory.go b/socket_factory.go index 27558cce..0db7849b 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -2,6 +2,7 @@ package roadrunner import ( "context" + "github.com/shirou/gopsutil/process" "net" "os/exec" "strings" @@ -110,6 +111,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm w.Kill(context.Background()), w.Wait(context.Background()), ) + c <- socketSpawn{ w: nil, err: err, @@ -178,10 +180,16 @@ func (f *SocketFactory) Close(ctx context.Context) error { // waits for WorkerProcess to connect over socket and returns associated relay of timeout func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) { + ticker := time.NewTicker(time.Millisecond * 100) for { select { case <-ctx.Done(): return nil, ctx.Err() + case <-ticker.C: + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } default: tmp, ok := f.relays.Load(w.Pid()) if !ok { diff --git a/socket_factory_test.go b/socket_factory_test.go index cfb95ca1..6ab87872 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -98,28 +98,29 @@ func Test_Tcp_StartError(t *testing.T) { assert.Nil(t, w) } -// func Test_Tcp_Failboot(t *testing.T) { -// time.Sleep(time.Millisecond * 10) // to ensure free socket -// -// ls, err := net.Listen("tcp", "localhost:9007") -// if assert.NoError(t, err) { -// defer func() { -// err3 := ls.Close() -// if err3 != nil { -// t.Errorf("error closing the listener: error %v", err3) -// } -// }() -// } else { -// t.Skip("socket is busy") -// } -// -// cmd := exec.Command("php", "tests/failboot.php") -// -// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd) -// assert.Nil(t, w) -// assert.Error(t, err2) -// assert.Contains(t, err2.Error(), "failboot") -//} +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket @@ -161,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -208,7 +209,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -248,7 +249,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -304,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -393,7 +394,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -436,7 +437,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -512,7 +513,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -580,7 +581,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/static_pool.go b/static_pool.go index 0c2352ad..31923134 100644 --- a/static_pool.go +++ b/static_pool.go @@ -3,23 +3,21 @@ package roadrunner import ( "context" "fmt" + "github.com/spiral/roadrunner/v2/util" "os/exec" "sync" "github.com/pkg/errors" ) -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - // pool behaviour - cfg *Config + cfg Config // worker command creator cmd func() *exec.Cmd @@ -27,30 +25,31 @@ type StaticPool struct { // creates and connects to stack factory Factory + // distributes the events + events *util.EventHandler + // protects state of worker list, does not affect allocation muw sync.RWMutex - ww *WorkersWatcher + // manages worker states and TTLs + ww *workerWatcher - events chan PoolEvent -} -type PoolEvent struct { - Payload interface{} + // supervises memory and TTL of workers + sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -// TODO why cfg is passed by pointer? -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() p := &StaticPool{ cfg: cfg, cmd: cmd, factory: factory, - events: make(chan PoolEvent), + events: &util.EventHandler{}, } - p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { + p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { return nil, err @@ -74,12 +73,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co return nil, err } + // todo: implement + //p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) + //p.sp.Start() + return p, nil } +// AddListener connects event listener to the pool. +func (p *StaticPool) AddListener(listener util.EventListener) { + p.events.AddListener(listener) +} + // Config returns associated pool configuration. Immutable. -func (p *StaticPool) Config() Config { - return *p.cfg +func (p *StaticPool) GetConfig() Config { + return p.cfg } // Workers returns worker list associated with the pool. @@ -103,18 +111,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { rsp, err := sw.Exec(rqs) if err != nil { - errJ := p.checkMaxJobs(bCtx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) + if _, jobError := err.(JobError); jobError { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(bCtx) + if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + p.ww.PushWorker(w) + } + return EmptyPayload, err } sw.State().Set(StateInvalid) + p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) + if errS != nil { return EmptyPayload, fmt.Errorf("%v, %v", err, errS) } @@ -127,9 +147,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - return EmptyPayload, err + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.ExecWithContext(bCtx, rqs) + + return p.Exec(rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { @@ -146,81 +167,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { } // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - // todo: why TODO passed here? - getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) - defer cancel() - w, err := p.ww.GetFreeWorker(getWorkerCtx) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped - } else if err != nil { - return EmptyPayload, err - } - - sw := w.(SyncWorker) - - var execCtx context.Context - if p.cfg.ExecTTL != 0 { - var cancel2 context.CancelFunc - execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() - } else { - execCtx = ctx - } - - rsp, err := sw.ExecWithContext(execCtx, rqs) - if err != nil { - errJ := p.checkMaxJobs(ctx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } - // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - errS := w.Stop(ctx) - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) - } - - return EmptyPayload, err - } - - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(ctx) - if err != nil { - return EmptyPayload, err - } - return p.ExecWithContext(ctx, rqs) - } - - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(ctx) - if err != nil { - return EmptyPayload, err - } - } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() - } - return rsp, nil -} +//func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +// // todo: why TODO passed here? +// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) +// defer cancel() +// w, err := p.ww.GetFreeWorker(getWorkerCtx) +// if err != nil && errors.Is(err, ErrWatcherStopped) { +// return EmptyPayload, ErrWatcherStopped +// } else if err != nil { +// return EmptyPayload, err +// } +// +// sw := w.(SyncWorker) +// +// // todo: implement worker destroy +// //execCtx context.Context +// //if p.cfg.Supervisor.ExecTTL != 0 { +// // var cancel2 context.CancelFunc +// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) +// // defer cancel2() +// //} else { +// // execCtx = ctx +// //} +// +// rsp, err := sw.Exec(rqs) +// if err != nil { +// errJ := p.checkMaxJobs(ctx, w) +// if errJ != nil { +// // todo: worker was not destroyed +// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) +// } +// +// // soft job errors are allowed +// if _, jobError := err.(JobError); jobError { +// p.ww.PushWorker(w) +// return EmptyPayload, err +// } +// +// sw.State().Set(StateInvalid) +// errS := w.Stop(ctx) +// if errS != nil { +// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) +// } +// +// return EmptyPayload, err +// } +// +// // worker want's to be terminated +// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { +// w.State().Set(StateInvalid) +// err = w.Stop(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// return p.ExecWithContext(ctx, rqs) +// } +// +// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { +// err = p.ww.AllocateNew(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// } else { +// p.muw.Lock() +// p.ww.PushWorker(w) +// p.muw.Unlock() +// } +// +// return rsp, nil +//} // Destroy all underlying stack (but let them to complete the task). func (p *StaticPool) Destroy(ctx context.Context) { p.ww.Destroy(ctx) } -func (p *StaticPool) Events() chan PoolEvent { - return p.events -} - // allocate required number of stack func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase @@ -243,6 +264,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { err := p.ww.AllocateNew(ctx) if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } diff --git a/static_pool_test.go b/static_pool_test.go index ce9e6820..4a0c483a 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "log" "os/exec" "runtime" @@ -18,7 +17,6 @@ var cfg = Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, } func Test_NewPool(t *testing.T) { @@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) - assert.Equal(t, cfg, p.Config()) - defer p.Destroy(ctx) assert.NotNil(t, p) @@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), - &cfg, + cfg, ) assert.Nil(t, p) @@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -143,31 +139,31 @@ func Test_StaticPool_JobError(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } // TODO temporary commented, figure out later -// func Test_StaticPool_Broken_Replace(t *testing.T) { +//func Test_StaticPool_Broken_Replace(t *testing.T) { // ctx := context.Background() // p, err := NewPool( // ctx, // func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, // NewPipeFactory(), -// &cfg, +// cfg, // ) // assert.NoError(t, err) // assert.NotNil(t, p) @@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) { // var i int64 // atomic.StoreInt64(&i, 10) // +// p.AddListener(func(event interface{}) { +// +// }) +// // go func() { // for { // select { @@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Consume pool events wg := sync.WaitGroup{} wg.Add(1) - go func() { - for true { - select { - case ev := <-p.Events(): - fmt.Println(ev) - if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct { - wg.Done() - } + p.AddListener(func(event interface{}) { + if pe, ok := event.(PoolEvent); ok { + if pe.Event == EventWorkerConstruct { + wg.Done() } } - }() + }) // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill(ctx) @@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, - ExecTTL: time.Second * 4, }, ) assert.Error(t, err) @@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) assert.NoError(t, err) @@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 15, }, ) assert.NoError(t, err) @@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NotNil(t, p) - go func() { - for { - select { - case ev := <-p.Events(): - fmt.Println(ev) - } - } - }() - var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err := p.Exec(Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) assert.NoError(t, err) @@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(StateErrored) } - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) @@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) if err != nil { b.Fatal(err) @@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/sync_worker.go b/sync_worker.go index de9491d6..cbc2cc0b 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -3,6 +3,7 @@ package roadrunner import ( "context" "fmt" + "github.com/spiral/roadrunner/v2/util" "time" "github.com/pkg/errors" @@ -14,19 +15,17 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) - - // ExecWithContext allow to set ExecTTL - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } -type taskWorker struct { +type syncWorker struct { w WorkerBase } func NewSyncWorker(w WorkerBase) (SyncWorker, error) { - return &taskWorker{ + return &syncWorker{ w: w, }, nil } @@ -36,68 +35,9 @@ type twexec struct { err error } -func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - c := make(chan twexec) - go func() { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("payload can not be empty"), - } - return - } - - if tw.w.State().Value() != StateReady { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()), - } - return - } - - // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) - - rsp, err := tw.execPayload(rqs) - if err != nil { - if _, ok := err.(TaskError); !ok { - tw.w.State().Set(StateErrored) - tw.w.State().RegisterExec() - } - c <- twexec{ - payload: EmptyPayload, - err: err, - } - return - } - - tw.w.State().Set(StateReady) - tw.w.State().RegisterExec() - c <- twexec{ - payload: rsp, - err: nil, - } - return - }() - - for { - select { - case <-ctx.Done(): - return EmptyPayload, ctx.Err() - case res := <-c: - if res.err != nil { - return EmptyPayload, res.err - } - - return res.payload, nil - } - } -} - -// -func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { +// Exec payload without TTL timeout. +func (tw *syncWorker) Exec(p Payload) (Payload, error) { + if len(p.Body) == 0 && len(p.Context) == 0 { return EmptyPayload, fmt.Errorf("payload can not be empty") } @@ -109,9 +49,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.w.State().Set(StateWorking) - rsp, err := tw.execPayload(rqs) + rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(TaskError); !ok { + if _, ok := err.(JobError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -124,7 +64,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { +func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { return EmptyPayload, errors.Wrap(err, "header error") @@ -147,7 +87,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, TaskError(rsp.Context) + return EmptyPayload, JobError(rsp.Context) } // add streaming support :) @@ -158,46 +98,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) String() string { +func (tw *syncWorker) String() string { return tw.w.String() } -func (tw *taskWorker) Created() time.Time { - return tw.w.Created() +func (tw *syncWorker) Pid() int64 { + return tw.w.Pid() } -func (tw *taskWorker) Events() <-chan WorkerEvent { - return tw.w.Events() +func (tw *syncWorker) Created() time.Time { + return tw.w.Created() } -func (tw *taskWorker) Pid() int64 { - return tw.w.Pid() +func (tw *syncWorker) AddListener(listener util.EventListener) { + tw.w.AddListener(listener) } -func (tw *taskWorker) State() State { +func (tw *syncWorker) State() State { return tw.w.State() } -func (tw *taskWorker) Start() error { +func (tw *syncWorker) Start() error { return tw.w.Start() } -func (tw *taskWorker) Wait(ctx context.Context) error { +func (tw *syncWorker) Wait(ctx context.Context) error { return tw.w.Wait(ctx) } -func (tw *taskWorker) Stop(ctx context.Context) error { +func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *taskWorker) Kill(ctx context.Context) error { +func (tw *syncWorker) Kill(ctx context.Context) error { return tw.w.Kill(ctx) } -func (tw *taskWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() goridge.Relay { return tw.w.Relay() } -func (tw *taskWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl goridge.Relay) { tw.w.AttachRelay(rl) } diff --git a/sync_worker_test.go b/sync_worker_test.go index f4868009..ad1513d7 100644 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -2,13 +2,10 @@ package roadrunner import ( "context" - "errors" + "github.com/stretchr/testify/assert" "os/exec" "sync" "testing" - "time" - - "github.com/stretchr/testify/assert" ) func Test_Echo(t *testing.T) { @@ -34,7 +31,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -65,7 +62,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, EmptyPayload) + res, err := syncWorker.Exec(EmptyPayload) assert.Error(t, err) assert.Nil(t, res.Body) @@ -84,7 +81,6 @@ func Test_NotStarted_String(t *testing.T) { } func Test_NotStarted_Exec(t *testing.T) { - ctx := context.Background() cmd := exec.Command("php", "tests/client.php", "echo", "pipes") w, _ := InitBaseWorker(cmd) @@ -94,7 +90,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -143,7 +139,7 @@ func Test_Echo_Slow(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -164,28 +160,34 @@ func Test_Broken(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - go func() { - assert.NotNil(t, w) - tt := time.NewTimer(time.Second * 10) - defer wg.Done() - for { - select { - case ev := <-w.Events(): - assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") - return - case <-tt.C: - assert.Error(t, errors.New("no events from worker")) - return - } - } - }() + + w.AddListener(func(event interface{}) { + assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()") + wg.Done() + }) + + //go func() { + // assert.NotNil(t, w) + // tt := time.NewTimer(time.Second * 10) + // defer wg.Done() + // for { + // select { + // case ev := <-w.Events(): + // assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") + // return + // case <-tt.C: + // assert.Error(t, errors.New("no events from worker")) + // return + // } + // } + //}() syncWorker, err := NewSyncWorker(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -215,12 +217,12 @@ func Test_Error(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -244,19 +246,19 @@ func Test_NumExecs(t *testing.T) { t.Fatal(err) } - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/util/events.go b/util/events.go new file mode 100644 index 00000000..9e12c4f7 --- /dev/null +++ b/util/events.go @@ -0,0 +1,26 @@ +package util + +// Event listener listens for the events produced by worker, worker pool or other servce. +type EventListener func(event interface{}) + +// EventHandler helps to broadcast events to multiple listeners. +type EventHandler struct { + listeners []EventListener +} + +// NumListeners returns number of event listeners. +func (eb *EventHandler) NumListeners() int { + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *EventHandler) AddListener(listener EventListener) { + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *EventHandler) Push(e interface{}) { + for _, listener := range eb.listeners { + listener(e) + } +} diff --git a/worker.go b/worker.go index c0a735c2..05b5712d 100644 --- a/worker.go +++ b/worker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/spiral/roadrunner/v2/util" "os" "os/exec" "strconv" @@ -15,6 +16,12 @@ import ( "go.uber.org/multierr" ) +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond +) + // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. @@ -22,38 +29,31 @@ const ( // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog - - // EventWorkerWaitDone triggered when worker exit from process Wait - EventWorkerWaitDone // todo: implemented? - - EventWorkerBufferClosed - - EventRelayCloseError - - EventWorkerProcessError ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 100 * time.Millisecond -) - -// todo: write comment +// WorkerEvent wraps worker events. type WorkerEvent struct { - Event int64 - Worker WorkerBase + // Event id, see below. + Event int64 + + // Worker triggered the event. + Worker WorkerBase + + // Event specific payload. Payload interface{} } type WorkerBase interface { fmt.Stringer - Created() time.Time + // Pid returns worker pid. + Pid() int64 - Events() <-chan WorkerEvent + // Created returns time worker was created at. + Created() time.Time - Pid() int64 + // AddListener attaches listener to consume worker events. + AddListener(listener util.EventListener) // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. @@ -88,7 +88,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events chan WorkerEvent + events *util.EventHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -129,7 +129,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { } w := &WorkerProcess{ created: time.Now(), - events: make(chan WorkerEvent, 10), + events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), } @@ -142,12 +142,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { return w, nil } +// Pid returns worker pid. +func (w *WorkerProcess) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. func (w *WorkerProcess) Created() time.Time { return w.created } -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) +// AddListener registers new worker event listener. +func (w *WorkerProcess) AddListener(listener util.EventListener) { + w.events.AddListener(listener) + + w.errBuffer.mu.Lock() + w.errBuffer.enable = true + w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -195,10 +206,6 @@ func (w *WorkerProcess) Start() error { return nil } -func (w *WorkerProcess) Events() <-chan WorkerEvent { - return w.events -} - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -208,15 +215,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error { w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer + + // if no errors in the events, error might be in the errBuffer if w.errBuffer.Len() > 0 { err = multierr.Append(err, errors.New(w.errBuffer.String())) } @@ -250,6 +250,7 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) + go func() { var err error w.errBuffer.Close() @@ -264,6 +265,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { w.state.Set(StateStopped) c <- nil }() + select { case <-ctx.Done(): return ctx.Err() @@ -290,16 +292,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error { } func (w *WorkerProcess) logCallback(log []byte) { - w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log} + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) } // thread safe errBuffer type errBuffer struct { - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo remove update + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo: remove update update chan interface{} stop chan interface{} logCallback func(log []byte) @@ -321,7 +324,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() - if len(eb.buf) > eb.last { + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.buf = eb.buf[0:0] eb.last = len(eb.buf) @@ -331,11 +334,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Stop() eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb == nil || eb.logCallback == nil { - eb.mu.Unlock() - return - } + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.last = len(eb.buf) } diff --git a/worker_test.go b/worker_test.go index a90b7ef2..d2744345 100644 --- a/worker_test.go +++ b/worker_test.go @@ -91,6 +91,7 @@ func TestErrBuffer_Write_Event(t *testing.T) { assert.Equal(t, []byte("hello\n"), log) wg.Done() } + buf.enable = true _, err := buf.Write([]byte("hello\n")) if err != nil { @@ -116,6 +117,8 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { assert.Equal(t, []byte("hello\nending"), log) wg.Done() } + buf.enable = true + _, err := buf.Write([]byte("hel")) if err != nil { t.Errorf("fail to write: error %v", err) diff --git a/worker_watcher.go b/worker_watcher.go new file mode 100644 index 00000000..773f7745 --- /dev/null +++ b/worker_watcher.go @@ -0,0 +1,329 @@ +package roadrunner + +import ( + "context" + "errors" + "github.com/spiral/roadrunner/v2/util" + "sync" + "time" +) + +var ErrWatcherStopped = errors.New("watcher stopped") + +type Stack struct { + workers []WorkerBase + mutex sync.RWMutex + destroy bool +} + +func NewWorkersStack() *Stack { + return &Stack{ + workers: make([]WorkerBase, 0, 12), + } +} + +func (stack *Stack) Reset() { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + stack.workers = nil +} + +func (stack *Stack) Push(w WorkerBase) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + stack.workers = append(stack.workers, w) +} + +func (stack *Stack) IsEmpty() bool { + stack.mutex.Lock() + defer stack.mutex.Unlock() + + return len(stack.workers) == 0 +} + +func (stack *Stack) Pop() (WorkerBase, bool) { + stack.mutex.Lock() + defer stack.mutex.Unlock() + // do not release new stack + if stack.destroy { + return nil, true + } + + if len(stack.workers) == 0 { + return nil, false + } + + w := stack.workers[len(stack.workers)-1] + stack.workers = stack.workers[:len(stack.workers)-1] + + return w, false +} + +type WorkerWatcher interface { + // AddToWatch used to add stack to wait its state + AddToWatch(ctx context.Context, workers []WorkerBase) error + + // GetFreeWorker provide first free worker + GetFreeWorker(ctx context.Context) (WorkerBase, error) + + // PutWorker enqueues worker back + PushWorker(w WorkerBase) + + // AllocateNew used to allocate new worker and put in into the WorkerWatcher + AllocateNew(ctx context.Context) error + + // Destroy destroys the underlying stack + Destroy(ctx context.Context) + + // WorkersList return all stack w/o removing it from internal storage + WorkersList() []WorkerBase + + // RemoveWorker remove worker from the stack + RemoveWorker(ctx context.Context, wb WorkerBase) error +} + +// workerCreateFunc can be nil, but in that case, dead stack will not be replaced +func newWorkerWatcher( + allocator func(args ...interface{}) (WorkerBase, error), + numWorkers int64, + events *util.EventHandler, +) *workerWatcher { + ww := &workerWatcher{ + stack: NewWorkersStack(), + allocator: allocator, + initialNumWorkers: numWorkers, + actualNumWorkers: numWorkers, + events: events, + } + + return ww +} + +type workerWatcher struct { + mutex sync.RWMutex + stack *Stack + allocator func(args ...interface{}) (WorkerBase, error) + initialNumWorkers int64 + actualNumWorkers int64 + events *util.EventHandler +} + +func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { + for i := 0; i < len(workers); i++ { + sw, err := NewSyncWorker(workers[i]) + if err != nil { + return err + } + ww.stack.Push(sw) + sw.AddListener(ww.events.Push) + + go func(swc WorkerBase) { + ww.wait(ctx, swc) + }(sw) + } + return nil +} + +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { + // thread safe operation + w, stop := ww.stack.Pop() + if stop { + return nil, ErrWatcherStopped + } + + // handle worker remove state + // in this state worker is destroyed by supervisor + if w != nil && w.State().Value() == StateRemove { + err := ww.RemoveWorker(ctx, w) + if err != nil { + return nil, err + } + // try to get next + return ww.GetFreeWorker(ctx) + } + + // no free stack + if w == nil { + tout := time.NewTicker(time.Second * 180) + defer tout.Stop() + for { + select { + default: + w, stop = ww.stack.Pop() + if stop { + return nil, ErrWatcherStopped + } + if w == nil { + continue + } + ww.decreaseNumOfActualWorkers() + return w, nil + case <-tout.C: + return nil, errors.New("no free stack") + } + } + } + + ww.decreaseNumOfActualWorkers() + return w, nil +} + +func (ww *workerWatcher) AllocateNew(ctx context.Context) error { + ww.stack.mutex.Lock() + sw, err := ww.allocator() + if err != nil { + return err + } + + ww.addToWatch(sw) + ww.stack.mutex.Unlock() + ww.PushWorker(sw) + + ww.events.Push(PoolEvent{ + Event: EventWorkerConstruct, + Payload: sw, + }) + + return nil +} + +func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { + ww.stack.mutex.Lock() + defer ww.stack.mutex.Unlock() + pid := wb.Pid() + for i := 0; i < len(ww.stack.workers); i++ { + if ww.stack.workers[i].Pid() == pid { + // found in the stack + // remove worker + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + ww.decreaseNumOfActualWorkers() + + wb.State().Set(StateInvalid) + err := wb.Kill(ctx) + if err != nil { + return err + } + break + } + } + // worker currently handle request, set state Remove + wb.State().Set(StateRemove) + return nil +} + +// O(1) operation +func (ww *workerWatcher) PushWorker(w WorkerBase) { + ww.mutex.Lock() + ww.actualNumWorkers++ + ww.mutex.Unlock() + ww.stack.Push(w) +} + +func (ww *workerWatcher) ReduceWorkersCount() { + ww.decreaseNumOfActualWorkers() +} + +// Destroy all underlying stack (but let them to complete the task) +func (ww *workerWatcher) Destroy(ctx context.Context) { + ww.stack.mutex.Lock() + ww.stack.destroy = true + ww.stack.mutex.Unlock() + + tt := time.NewTicker(time.Millisecond * 100) + for { + select { + case <-tt.C: + ww.stack.mutex.Lock() + if len(ww.stack.workers) != int(ww.actualNumWorkers) { + ww.stack.mutex.Unlock() + continue + } + ww.stack.mutex.Unlock() + // unnecessary mutex, but + // just to make sure. All stack at this moment are in the stack + // Pop operation is blocked, push can't be done, since it's not possible to pop + ww.stack.mutex.Lock() + for i := 0; i < len(ww.stack.workers); i++ { + // set state for the stack in the stack (unused at the moment) + ww.stack.workers[i].State().Set(StateDestroyed) + } + ww.stack.mutex.Unlock() + tt.Stop() + // clear + ww.stack.Reset() + return + } + } +} + +// Warning, this is O(n) operation +func (ww *workerWatcher) WorkersList() []WorkerBase { + return ww.stack.workers +} + +func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { + err := w.Wait(ctx) + if err != nil { + ww.events.Push(WorkerEvent{ + Event: EventWorkerError, + Worker: w, + Payload: err, + }) + } + + if w.State().Value() == StateDestroyed { + // worker was manually destroyed, no need to replace + return + } + + pid := w.Pid() + ww.stack.mutex.Lock() + for i := 0; i < len(ww.stack.workers); i++ { + // worker in the stack, reallocating + if ww.stack.workers[i].Pid() == pid { + + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + ww.decreaseNumOfActualWorkers() + ww.stack.mutex.Unlock() + + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) + } + + return + } + } + + ww.stack.mutex.Unlock() + + // worker not in the stack (not returned), forget and allocate new + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) + return + } + + return +} + +func (ww *workerWatcher) addToWatch(wb WorkerBase) { + ww.mutex.Lock() + defer ww.mutex.Unlock() + go func() { + ww.wait(context.Background(), wb) + }() +} + +func (ww *workerWatcher) decreaseNumOfActualWorkers() { + ww.mutex.Lock() + ww.actualNumWorkers-- + ww.mutex.Unlock() +} diff --git a/workers_watcher.go b/workers_watcher.go deleted file mode 100644 index d9d27196..00000000 --- a/workers_watcher.go +++ /dev/null @@ -1,323 +0,0 @@ -package roadrunner - -import ( - "context" - "errors" - "sync" - "time" -) - -var ErrWatcherStopped = errors.New("watcher stopped") - -type Stack struct { - workers []WorkerBase - mutex sync.RWMutex - destroy bool -} - -func NewWorkersStack() *Stack { - return &Stack{ - workers: make([]WorkerBase, 0, 12), - } -} - -func (stack *Stack) Reset() { - stack.mutex.Lock() - defer stack.mutex.Unlock() - - stack.workers = nil -} - -func (stack *Stack) Push(w WorkerBase) { - stack.mutex.Lock() - defer stack.mutex.Unlock() - stack.workers = append(stack.workers, w) -} - -func (stack *Stack) IsEmpty() bool { - stack.mutex.Lock() - defer stack.mutex.Unlock() - - return len(stack.workers) == 0 -} - -func (stack *Stack) Pop() (WorkerBase, bool) { - stack.mutex.Lock() - defer stack.mutex.Unlock() - // do not release new stack - if stack.destroy { - return nil, true - } - - if len(stack.workers) == 0 { - return nil, false - } - - w := stack.workers[len(stack.workers)-1] - stack.workers = stack.workers[:len(stack.workers)-1] - - return w, false -} - -type WorkersWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator func(args ...interface{}) (WorkerBase, error) - initialNumWorkers int64 - actualNumWorkers int64 - events chan PoolEvent -} - -type WorkerWatcher interface { - // AddToWatch used to add stack to wait its state - AddToWatch(ctx context.Context, workers []WorkerBase) error - // GetFreeWorker provide first free worker - GetFreeWorker(ctx context.Context) (WorkerBase, error) - // PutWorker enqueues worker back - PushWorker(w WorkerBase) - // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew(ctx context.Context) error - // Destroy destroys the underlying stack - Destroy(ctx context.Context) - // WorkersList return all stack w/o removing it from internal storage - WorkersList() []WorkerBase - // RemoveWorker remove worker from the stack - RemoveWorker(ctx context.Context, wb WorkerBase) error -} - -// workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { - // todo check if events not nil - ww := &WorkersWatcher{ - stack: NewWorkersStack(), - allocator: allocator, - initialNumWorkers: numWorkers, - actualNumWorkers: numWorkers, - events: events, - } - - return ww -} - -func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { - for i := 0; i < len(workers); i++ { - sw, err := NewSyncWorker(workers[i]) - if err != nil { - return err - } - ww.stack.Push(sw) - ww.watch(sw) - - go func(swc WorkerBase) { - ww.wait(ctx, swc) - }(sw) - } - return nil -} - -func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { - // thread safe operation - w, stop := ww.stack.Pop() - if stop { - return nil, ErrWatcherStopped - } - // handle worker remove state - // in this state worker is destroyed by supervisor - if w != nil && w.State().Value() == StateRemove { - err := ww.RemoveWorker(ctx, w) - if err != nil { - return nil, err - } - // try to get next - return ww.GetFreeWorker(ctx) - } - // no free stack - if w == nil { - tout := time.NewTicker(time.Second * 180) - defer tout.Stop() - for { - select { - default: - w, stop = ww.stack.Pop() - if stop { - return nil, ErrWatcherStopped - } - if w == nil { - continue - } - ww.decreaseNumOfActualWorkers() - return w, nil - case <-tout.C: - return nil, errors.New("no free stack") - } - } - } - ww.decreaseNumOfActualWorkers() - return w, nil -} - -func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { - ww.stack.mutex.Lock() - sw, err := ww.allocator() - if err != nil { - return err - } - ww.addToWatch(sw) - ww.stack.mutex.Unlock() - ww.PushWorker(sw) - return nil -} - -func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { - ww.stack.mutex.Lock() - defer ww.stack.mutex.Unlock() - pid := wb.Pid() - for i := 0; i < len(ww.stack.workers); i++ { - if ww.stack.workers[i].Pid() == pid { - // found in the stack - // remove worker - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - - wb.State().Set(StateInvalid) - err := wb.Kill(ctx) - if err != nil { - return err - } - break - } - } - // worker currently handle request, set state Remove - wb.State().Set(StateRemove) - return nil -} - -// O(1) operation -func (ww *WorkersWatcher) PushWorker(w WorkerBase) { - ww.mutex.Lock() - ww.actualNumWorkers++ - ww.mutex.Unlock() - ww.stack.Push(w) -} - -func (ww *WorkersWatcher) ReduceWorkersCount() { - ww.decreaseNumOfActualWorkers() -} - -// Destroy all underlying stack (but let them to complete the task) -func (ww *WorkersWatcher) Destroy(ctx context.Context) { - ww.stack.mutex.Lock() - ww.stack.destroy = true - ww.stack.mutex.Unlock() - - tt := time.NewTicker(time.Millisecond * 100) - for { - select { - case <-tt.C: - ww.stack.mutex.Lock() - if len(ww.stack.workers) != int(ww.actualNumWorkers) { - ww.stack.mutex.Unlock() - continue - } - ww.stack.mutex.Unlock() - // unnecessary mutex, but - // just to make sure. All stack at this moment are in the stack - // Pop operation is blocked, push can't be done, since it's not possible to pop - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // set state for the stack in the stack (unused at the moment) - ww.stack.workers[i].State().Set(StateDestroyed) - } - ww.stack.mutex.Unlock() - tt.Stop() - // clear - ww.stack.Reset() - return - } - } -} - -// Warning, this is O(n) operation -func (ww *WorkersWatcher) WorkersList() []WorkerBase { - return ww.stack.workers -} - -func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) { - err := w.Wait(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} - } - // If not destroyed, reallocate - if w.State().Value() != StateDestroyed { - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - ww.stack.mutex.Unlock() - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} - return - } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} - return - } - } - ww.stack.mutex.Unlock() - // worker not in the stack (not returned), forget and allocate new - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} - return - } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} - } - return -} - -func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { - ww.mutex.Lock() - defer ww.mutex.Unlock() - go func() { - ww.wait(context.Background(), wb) - }() -} - -func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { - ww.mutex.Lock() - ww.actualNumWorkers-- - ww.mutex.Unlock() -} - -func (ww *WorkersWatcher) watch(swc WorkerBase) { - // todo make event to stop function - go func() { - select { - case ev := <-swc.Events(): - ww.events <- PoolEvent{Payload: ev} - } - }() -} -- cgit v1.2.3