diff options
34 files changed, 798 insertions, 889 deletions
@@ -1,11 +1,11 @@ package roadrunner -// TaskError is job level error (no WorkerProcess halt), wraps at top +// JobError is job level error (no WorkerProcess halt), wraps at top // of error context -type TaskError []byte +type JobError []byte // Error converts error context to string -func (te TaskError) Error() string { +func (te JobError) Error() string { return string(te) } diff --git a/errors/go.mod b/errors/go.mod index 60dac691..1eaacc23 100755 --- a/errors/go.mod +++ b/errors/go.mod @@ -1 +1,3 @@ module github.com/48d90782/errors + +go 1.15 diff --git a/errors_test.go b/errors_test.go index 69f1c9ec..75a86840 100755 --- a/errors_test.go +++ b/errors_test.go @@ -8,7 +8,7 @@ import ( ) func Test_JobError_Error(t *testing.T) { - e := TaskError([]byte("error")) + e := JobError([]byte("error")) assert.Equal(t, "error", e.Error()) } @@ -10,7 +10,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/shirou/gopsutil v2.20.9+incompatible github.com/spf13/viper v1.7.1 - github.com/spiral/endure v1.0.0-beta8 + github.com/spiral/endure v1.0.0-beta10 github.com/spiral/goridge/v2 v2.4.5 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -35,6 +35,8 @@ github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA= +github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -42,6 +44,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= +github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -52,8 +56,12 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/goccy/go-graphviz v0.0.8 h1:hYQikvj368s8+rmfzFOZeiCXvSocGH7rfAyLTOy/7AM= +github.com/goccy/go-graphviz v0.0.8/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQFC6TlNvLhk= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -100,6 +108,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -141,6 +150,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= +github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= @@ -184,8 +195,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= -github.com/spiral/endure v1.0.0-beta8 h1:bKVe7F8CbvDZt8UYX3WoSV49OpFPHiM9Py55i7USPK8= -github.com/spiral/endure v1.0.0-beta8/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ= +github.com/spiral/endure v1.0.0-beta10 h1:eAFnJspvmMRDL3u7iwWK8eewogMhq4TTG3CjCBPnbeI= +github.com/spiral/endure v1.0.0-beta10/go.mod h1:mXFf8zPqr1SJ1cG0Zf59f6X+MvJzrdIwVjzQpa107e0= github.com/spiral/goridge/v2 v2.4.5 h1:rg4lLEJLrEh1Wj6G1qTsYVbYiQvig6mOR1F9GyDIGm8= github.com/spiral/goridge/v2 v2.4.5/go.mod h1:C/EZKFPON9lypi8QO7I5ObgVmrIzTmhZqFz/tmypcGc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -221,6 +232,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -228,6 +240,9 @@ golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm0 golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= +golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM= +golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 95eededa..ee2510f3 100755 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/plugins/config/provider.go b/plugins/config/provider.go index 580231fd..ac33b3de 100755 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -1,7 +1,7 @@ package config type Provider interface { - // Unmarshal configuration section into configuration object. + // UnmarshalKey reads configuration section into configuration object. // // func (h *HttpService) Init(cp config.Provider) error { // h.config := &HttpConfig{} diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go index c85a841f..14e60ac2 100755 --- a/plugins/config/tests/config_test.go +++ b/plugins/config/tests/config_test.go @@ -48,7 +48,7 @@ func TestViperProvider_Init(t *testing.T) { for { select { case e := <-errCh: - assert.NoError(t, e.Error.Err) + assert.NoError(t, e.Error) assert.NoError(t, container.Stop()) return case <-c: diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0c34313c..4e85af6b 100755 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -14,6 +14,7 @@ type ViperProvider struct { Prefix string } +// Inits config provider. func (v *ViperProvider) Init() error { v.viper = viper.New() @@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error { return nil } -// +// UnmarshalKey reads configuration section into configuration object. func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error { err := v.viper.UnmarshalKey(name, &out) if err != nil { diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go deleted file mode 100755 index 778b307d..00000000 --- a/plugins/events/broadcaster.go +++ /dev/null @@ -1,24 +0,0 @@ -package events - -type EventListener interface { - Handle(event interface{}) -} - -type EventBroadcaster struct { - listeners []EventListener -} - -func NewEventBroadcaster() *EventBroadcaster { - return &EventBroadcaster{} -} - -func (eb *EventBroadcaster) AddListener(l EventListener) { - // todo: threadcase - eb.listeners = append(eb.listeners, l) -} - -func (eb *EventBroadcaster) Push(e interface{}) { - for _, l := range eb.listeners { - l.Handle(e) - } -} diff --git a/plugins/factory/app.go b/plugins/factory/app.go index e4002963..4951e3df 100755 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,58 +1,76 @@ package factory import ( - "errors" + "context" "fmt" + "log" "os" "os/exec" "strings" - "time" + "github.com/fatih/color" + "github.com/spiral/endure/errors" "github.com/spiral/roadrunner/v2" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) -// AppConfig config combines factory, pool and cmd configurations. -type AppConfig struct { - Command string - User string - Group string - Env Env - - // Listen defines connection method and factory to be used to connect to workers: - // "pipes", "tcp://:6001", "unix://rr.sock" - // This config section must not change on re-configuration. - Relay string - - // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section - // must not change on re-configuration. - RelayTimeout time.Duration +const ServiceName = "app" + +type Env map[string]string + +// AppFactory creates workers for the application. +type AppFactory interface { + NewCmdFactory(env Env) (func() *exec.Cmd, error) + NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) + NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) } +// App manages worker type App struct { - cfg AppConfig - configProvider config.Provider + cfg Config + factory roadrunner.Factory } -func (app *App) Init(provider config.Provider) error { - app.cfg = AppConfig{} - app.configProvider = provider - - err := app.configProvider.UnmarshalKey("app", &app.cfg) +// Init application provider. +func (app *App) Init(cfg config.Provider) error { + err := cfg.UnmarshalKey(ServiceName, &app.cfg) if err != nil { return err } + app.cfg.InitDefaults() + + return nil +} + +// Name contains service name. +func (app *App) Name() string { + return ServiceName +} + +func (app *App) Serve() chan error { + errCh := make(chan error, 1) + var err error + + app.factory, err = app.initFactory() + if err != nil { + errCh <- errors.E(errors.Op("init factory"), err) + } + + return errCh +} - if app.cfg.Relay == "" { - app.cfg.Relay = "pipes" +func (app *App) Stop() error { + if app.factory == nil { + return nil } - return nil + return app.factory.Close(context.Background()) } -func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { +func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) { var cmdArgs []string + // create command according to the config cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...) @@ -75,15 +93,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { }, nil } -// todo ENV unused -func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { +func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + return app.factory.SpawnWorkerWithContext(ctx, spawnCmd()) +} + +func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { + spawnCmd, err := app.NewCmdFactory(env) + if err != nil { + return nil, err + } + + p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt) + if err != nil { + return nil, err + } + + p.AddListener(func(event interface{}) { + if we, ok := event.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } + }) + + return p, nil +} + +func (app *App) initFactory() (roadrunner.Factory, error) { if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { return roadrunner.NewPipeFactory(), nil } dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } lsn, err := util.CreateListener(app.cfg.Relay) @@ -98,7 +146,7 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil default: - return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") + return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)")) } } diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go deleted file mode 100755 index 024c5bea..00000000 --- a/plugins/factory/app_provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package factory - -import ( - "os/exec" - - "github.com/spiral/roadrunner/v2" -) - -type Env map[string]string - -type Spawner interface { - // CmdFactory create new command factory with given env variables. - NewCmd(env Env) (func() *exec.Cmd, error) - - // NewFactory inits new factory for workers. - NewFactory(env Env) (roadrunner.Factory, error) -} diff --git a/plugins/factory/config.go b/plugins/factory/config.go new file mode 100755 index 00000000..b2d1d0ad --- /dev/null +++ b/plugins/factory/config.go @@ -0,0 +1,37 @@ +package factory + +import "time" + +// Config config combines factory, pool and cmd configurations. +type Config struct { + // Command to run as application. + Command string + + // User to run application under. + User string + + // Group to run application under. + Group string + + // Env represents application environment. + Env Env + + // Listen defines connection method and factory to be used to connect to workers: + // "pipes", "tcp://:6001", "unix://rr.sock" + // This config section must not change on re-configuration. + Relay string + + // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section + // must not change on re-configuration. Defaults to 60s. + RelayTimeout time.Duration +} + +func (cfg *Config) InitDefaults() { + if cfg.Relay == "" { + cfg.Relay = "pipes" + } + + if cfg.RelayTimeout == 0 { + cfg.RelayTimeout = time.Second * 60 + } +} diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go deleted file mode 100755 index 5d80682d..00000000 --- a/plugins/factory/factory.go +++ /dev/null @@ -1,76 +0,0 @@ -package factory - -import ( - "context" - - "log" - - "github.com/fatih/color" - "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/events" -) - -type WorkerFactory interface { - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) -} - -type WFactory struct { - spw Spawner - eb *events.EventBroadcaster -} - -func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - factory, err := wf.spw.NewFactory(env) - if err != nil { - return nil, err - } - - p, err := roadrunner.NewPool(ctx, cmd, factory, opt) - if err != nil { - return nil, err - } - - // TODO event to stop - go func() { - for e := range p.Events() { - wf.eb.Push(e) - if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { - if we.Event == roadrunner.EventWorkerLog { - log.Print(color.YellowString(string(we.Payload.([]byte)))) - } - } - } - }() - - return p, nil -} - -func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { - cmd, err := wf.spw.NewCmd(env) - if err != nil { - return nil, err - } - - wb, err := roadrunner.InitBaseWorker(cmd()) - if err != nil { - return nil, err - } - - return wb, nil -} - -func (wf *WFactory) Init(app Spawner) error { - wf.spw = app - wf.eb = events.NewEventBroadcaster() - return nil -} - -func (wf *WFactory) AddListener(l events.EventListener) { - wf.eb.AddListener(l) -} diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php deleted file mode 100755 index c6199449..00000000 --- a/plugins/factory/hello.php +++ /dev/null @@ -1 +0,0 @@ -<?php echo "hello -" . time();
\ No newline at end of file diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 5347083a..6c264fd6 100755 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -31,11 +31,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Register(&factory.WFactory{}) - if err != nil { - t.Fatal(err) - } - err = container.Register(&Foo{}) if err != nil { t.Fatal(err) @@ -65,7 +60,7 @@ func TestFactory(t *testing.T) { for { select { case e := <-errCh: - assert.NoError(t, e.Error.Err) + assert.NoError(t, e.Error) assert.NoError(t, container.Stop()) return case <-c: diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 5ab6df73..9011bb00 100755 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -10,10 +10,10 @@ import ( type Foo struct { configProvider config.Provider - spawner factory.Spawner + spawner factory.AppFactory } -func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { +func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error { f.configProvider = p f.spawner = spw return nil @@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spawner.NewCmd(nil) + cmd, err := f.spawner.NewCmdFactory(nil) if err != nil { errCh <- err return errCh diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go index 2409627e..9f401bec 100755 --- a/plugins/factory/tests/plugin_2.go +++ b/plugins/factory/tests/plugin_2.go @@ -13,28 +13,26 @@ import ( type Foo2 struct { configProvider config.Provider - wf factory.WorkerFactory - spw factory.Spawner + wf factory.AppFactory } -func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error { +func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error { f.configProvider = p f.wf = workerFactory - f.spw = spawner return nil } func (f *Foo2) Serve() chan error { errCh := make(chan error, 1) - r := &factory.AppConfig{} + r := &factory.Config{} err := f.configProvider.UnmarshalKey("app", r) if err != nil { errCh <- err return errCh } - cmd, err := f.spw.NewCmd(nil) + cmd, err := f.wf.NewCmdFactory(nil) if err != nil { errCh <- err return errCh @@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error { _ = w - poolConfig := &roadrunner.Config{ + poolConfig := roadrunner.Config{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, DestroyTimeout: time.Second * 10, - TTL: 1000, - IdleTTL: 1000, - ExecTTL: time.Second * 10, - MaxPoolMemory: 10000, - MaxWorkerMemory: 10000, + Supervisor: roadrunner.SupervisorConfig{ + WatchTick: 60, + TTL: 1000, + IdleTTL: 10, + ExecTTL: time.Second * 10, + MaxWorkerMemory: 1000, + }, } pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil) diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go index 1039ee5e..719fd5e3 100755 --- a/plugins/rpc/config.go +++ b/plugins/rpc/config.go @@ -12,6 +12,9 @@ import ( type Config struct { // Listen string Listen string + + // Disabled disables RPC service. + Disabled bool } // InitDefaults allows to init blank config with pre-defined set of default values. diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go index 6568eea3..0f6c9753 100755 --- a/plugins/rpc/rpc.go +++ b/plugins/rpc/rpc.go @@ -1,21 +1,24 @@ package rpc import ( - "errors" + "net/rpc" + "github.com/spiral/endure" + "github.com/spiral/endure/errors" "github.com/spiral/goridge/v2" "github.com/spiral/roadrunner/v2/plugins/config" - - "net/rpc" ) -type PluginRpc interface { - Name() string - RpcService() (interface{}, error) +// RPCPluggable declares the ability to create set of public RPC methods. +type RPCPluggable interface { + endure.Named + + // Provides RPC methods for the given service. + RPCService() (interface{}, error) } -// ID contains default service name. -const ID = "rpc" +// ServiceName contains default service name. +const ServiceName = "rpc" type services struct { service interface{} @@ -24,52 +27,48 @@ type services struct { // Service is RPC service. type Service struct { - // TODO do we need a pointer here since all receivers are pointers?? - rpc *rpc.Server - configProvider config.Provider - services []services - config Config - close chan struct{} + rpc *rpc.Server + services []services + config Config + close chan struct{} } // Init rpc service. Must return true if service is enabled. func (s *Service) Init(cfg config.Provider) error { - s.configProvider = cfg - err := s.configProvider.UnmarshalKey(ID, &s.config) + if !cfg.Has(ServiceName) { + return errors.E(errors.Disabled) + } + + err := cfg.UnmarshalKey(ServiceName, &s.config) if err != nil { return err } + s.config.InitDefaults() - // TODO Do we need to init defaults - if s.config.Listen == "" { - s.config.InitDefaults() + if s.config.Disabled { + return errors.E(errors.Disabled) } - s.close = make(chan struct{}) + return s.config.Valid() +} - return nil +// Name contains service name. +func (s *Service) Name() string { + return ServiceName } // Serve serves the service. func (s *Service) Serve() chan error { + s.close = make(chan struct{}, 1) errCh := make(chan error, 1) - server := rpc.NewServer() - if server == nil { - errCh <- errors.New("rpc server is nil") - return errCh - } - s.rpc = server - if len(s.services) == 0 { - errCh <- errors.New("no services with RPC") - return errCh - } + s.rpc = rpc.NewServer() // Attach all services for i := 0; i < len(s.services); i++ { err := s.Register(s.services[i].name, s.services[i].service) if err != nil { - errCh <- err + errCh <- errors.E(errors.Op("register service"), err) return errCh } } @@ -85,7 +84,10 @@ func (s *Service) Serve() chan error { select { case <-s.close: // log error - errCh <- ln.Close() + err := ln.Close() + if err != nil { + errCh <- errors.E(errors.Op("close RPC socket"), err) + } return default: conn, err := ln.Accept() @@ -98,7 +100,7 @@ func (s *Service) Serve() chan error { } }() - return nil + return errCh } // Stop stops the service. @@ -109,12 +111,12 @@ func (s *Service) Stop() error { func (s *Service) Depends() []interface{} { return []interface{}{ - s.RpcService, + s.RegisterService, } } -func (s *Service) RpcService(p PluginRpc) error { - service, err := p.RpcService() +func (s *Service) RegisterService(p RPCPluggable) error { + service, err := p.RPCService() if err != nil { return err } @@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error { // no suitable methods. It also logs the error using package log. func (s *Service) Register(name string, svc interface{}) error { if s.rpc == nil { - return errors.New("RPC service is not configured") + return errors.E("RPC service is not configured") } return s.rpc.RegisterName(name, svc) @@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error { // Client creates new RPC client. func (s *Service) Client() (*rpc.Client, error) { - if s.configProvider == nil { - return nil, errors.New("RPC service is not configured") - } - conn, err := s.config.Dialer() if err != nil { return nil, err diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go deleted file mode 100755 index 9ab1e3e8..00000000 --- a/plugins/rpc/rpc_test.go +++ /dev/null @@ -1 +0,0 @@ -package rpc @@ -4,49 +4,52 @@ import ( "context" "runtime" "time" + + "github.com/spiral/roadrunner/v2/util" ) +// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log. +type PoolEvent struct { + // Event type, see below. + Event int64 + + // Payload depends on event type, typically it's worker or error. + Payload interface{} +} + const ( // EventWorkerConstruct thrown when new worker is spawned. - EventWorkerConstruct = iota + 100 + EventWorkerConstruct = iota + 7800 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct - // EventWorkerKill thrown after worker is being forcefully killed. - EventWorkerKill - - // EventWorkerError thrown any worker related even happen (passed with WorkerError) - EventWorkerEvent - - // EventWorkerDead thrown when worker stops worker for any reason. - EventWorkerDead - - // EventPoolError caused on pool wide errors + // EventPoolError caused on pool wide errors. EventPoolError -) -const ( - // EventMaxMemory caused when worker consumes more memory than allowed. - EventMaxMemory = iota + 8000 + // EventSupervisorError triggered when supervisor can not complete work. + EventSupervisorError - // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError + // todo: EventMaxMemory caused when worker consumes more memory than allowed. + EventMaxMemory + + // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError EventTTL - // EventIdleTTL triggered when worker spends too much time at rest. + // todo: EventIdleTTL triggered when worker spends too much time at rest. EventIdleTTL - // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). + // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL ) // Pool managed set of inner worker processes. type Pool interface { - // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK - Events() chan PoolEvent + // AddListener connects event listener to the pool. + AddListener(listener util.EventListener) - // Exec one task with given payload and context, returns result or error. - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + // GetConfig returns pool configuration. + GetConfig() Config // Exec Exec(rqs Payload) (Payload, error) @@ -54,18 +57,14 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []WorkerBase) + // Remove worker from the pool. RemoveWorker(ctx context.Context, worker WorkerBase) error - Config() Config - // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) } -// todo: merge with pool options - -// Config defines basic behaviour of worker creation and handling process. -// +// Configures the pool behaviour. type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. @@ -84,20 +83,8 @@ type Config struct { // properly destroy, if timeout reached worker will be killed. Defaults to 60s. DestroyTimeout time.Duration - // TTL defines maximum time worker is allowed to live. - TTL int64 - - // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. - IdleTTL int64 - - // ExecTTL defines maximum lifetime per job. - ExecTTL time.Duration - - // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes. - MaxPoolMemory uint64 - - // MaxWorkerMemory limits memory per worker. - MaxWorkerMemory uint64 + // Supervision config to limit worker and pool memory usage. + Supervisor SupervisorConfig } // InitDefaults enables default config values. @@ -113,4 +100,30 @@ func (cfg *Config) InitDefaults() { if cfg.DestroyTimeout == 0 { cfg.DestroyTimeout = time.Minute } + + cfg.Supervisor.InitDefaults() +} + +type SupervisorConfig struct { + // WatchTick defines how often to check the state of worker. + WatchTick time.Duration + + // TTL defines maximum time worker is allowed to live. + TTL int64 + + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. + IdleTTL int64 + + // ExecTTL defines maximum lifetime per job. + ExecTTL time.Duration + + // MaxWorkerMemory limits memory per worker. + MaxWorkerMemory uint64 +} + +// InitDefaults enables default config values. +func (cfg *SupervisorConfig) InitDefaults() { + if cfg.WatchTick == 0 { + cfg.WatchTick = time.Second + } } diff --git a/pool_supervisor.go b/pool_supervisor.go deleted file mode 100755 index c0a6ecd9..00000000 --- a/pool_supervisor.go +++ /dev/null @@ -1,182 +0,0 @@ -package roadrunner - -import ( - "context" - "errors" - "fmt" - "time" -) - -const MB = 1024 * 1024 - -type Supervisor interface { - Attach(pool Pool) - StartWatching() error - StopWatching() - Detach() -} - -type staticPoolSupervisor struct { - // maxWorkerMemory in MB - maxWorkerMemory uint64 - // maxPoolMemory in MB - maxPoolMemory uint64 - // maxWorkerTTL in seconds - maxWorkerTTL uint64 - // maxWorkerIdle in seconds - maxWorkerIdle uint64 - - // watchTimeout in seconds - watchTimeout uint64 - stopCh chan struct{} - - pool Pool -} - -/* -The arguments are: -maxWorkerMemory - maximum memory allowed for a single worker -maxPoolMemory - maximum pool memory allowed for a pool of a workers -maxTtl - maximum ttl for the worker after which it will be killed and replaced -maxIdle - maximum time to live for the worker in Ready state -watchTimeout - time between watching for the workers/pool status -*/ -// TODO might be just wrap the pool and return ControlledPool with included Pool interface -func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor { - if maxWorkerMemory == 0 { - // just set to a big number, 5GB - maxPoolMemory = 5000 * MB - } - - if watchTimeout == 0 { - watchTimeout = 60 - } - - return &staticPoolSupervisor{ - maxWorkerMemory: maxWorkerMemory, - maxPoolMemory: maxPoolMemory, - maxWorkerTTL: maxTtl, - maxWorkerIdle: maxIdle, - watchTimeout: watchTimeout, - stopCh: make(chan struct{}), - } -} - -func (sps *staticPoolSupervisor) Attach(pool Pool) { - sps.pool = pool -} - -func (sps *staticPoolSupervisor) StartWatching() error { - go func() { - watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout)) - for { - select { - case <-sps.stopCh: - watchTout.Stop() - return - // stop here - case <-watchTout.C: - err := sps.control() - if err != nil { - sps.pool.Events() <- PoolEvent{Payload: err} - } - } - } - }() - return nil -} - -func (sps *staticPoolSupervisor) StopWatching() { - sps.stopCh <- struct{}{} -} - -func (sps *staticPoolSupervisor) Detach() { - -} - -func (sps *staticPoolSupervisor) control() error { - if sps.pool == nil { - return errors.New("pool should be attached") - } - now := time.Now() - ctx := context.TODO() - - // THIS IS A COPY OF WORKERS - workers := sps.pool.Workers() - totalUsedMemory := uint64(0) - - for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == StateInvalid { - continue - } - - s, err := WorkerProcessState(workers[i]) - if err != nil { - err2 := sps.pool.RemoveWorker(ctx, workers[i]) - if err2 != nil { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)} - return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2) - } - sps.pool.Events() <- PoolEvent{Payload: err} - return err - } - - if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) { - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - - // after remove worker we should exclude it from further analysis - workers = append(workers[:i], workers[i+1:]...) - } - - if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB { - // TODO events - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - continue - } - - // firs we check maxWorker idle - if sps.maxWorkerIdle != 0 { - // then check for the worker state - if workers[i].State().Value() != StateReady { - continue - } - /* - Calculate idle time - If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 - 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle - we are guessing that worker overlap idle time and has to be killed - */ - // get last used unix nano - lu := workers[i].State().LastUsed() - // convert last used to unixNano and sub time.now - res := int64(lu) - now.UnixNano() - // maxWorkerIdle more than diff between now and last used - if int64(sps.maxWorkerIdle)-res <= 0 { - sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)} - err = sps.pool.RemoveWorker(ctx, workers[i]) - if err != nil { - return err - } - workers = append(workers[:i], workers[i+1:]...) - } - } - - // the very last step is to calculate pool memory usage (except excluded workers) - totalUsedMemory += s.MemoryUsage - } - - // if current usage more than max allowed pool memory usage - if totalUsedMemory > sps.maxPoolMemory { - sps.pool.Destroy(ctx) - } - - return nil -} diff --git a/socket_factory.go b/socket_factory.go index 27558cce..ed151f2d 100755 --- a/socket_factory.go +++ b/socket_factory.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/shirou/gopsutil/process" + "github.com/pkg/errors" "github.com/spiral/goridge/v2" "go.uber.org/multierr" @@ -110,6 +112,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm w.Kill(context.Background()), w.Wait(context.Background()), ) + c <- socketSpawn{ w: nil, err: err, @@ -178,10 +181,16 @@ func (f *SocketFactory) Close(ctx context.Context) error { // waits for WorkerProcess to connect over socket and returns associated relay of timeout func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) { + ticker := time.NewTicker(time.Millisecond * 100) for { select { case <-ctx.Done(): return nil, ctx.Err() + case <-ticker.C: + _, err := process.NewProcess(int32(w.Pid())) + if err != nil { + return nil, err + } default: tmp, ok := f.relays.Load(w.Pid()) if !ok { diff --git a/socket_factory_test.go b/socket_factory_test.go index cfb95ca1..6ab87872 100755 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -98,28 +98,29 @@ func Test_Tcp_StartError(t *testing.T) { assert.Nil(t, w) } -// func Test_Tcp_Failboot(t *testing.T) { -// time.Sleep(time.Millisecond * 10) // to ensure free socket -// -// ls, err := net.Listen("tcp", "localhost:9007") -// if assert.NoError(t, err) { -// defer func() { -// err3 := ls.Close() -// if err3 != nil { -// t.Errorf("error closing the listener: error %v", err3) -// } -// }() -// } else { -// t.Skip("socket is busy") -// } -// -// cmd := exec.Command("php", "tests/failboot.php") -// -// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd) -// assert.Nil(t, w) -// assert.Error(t, err2) -// assert.Contains(t, err2.Error(), "failboot") -//} +func Test_Tcp_Failboot(t *testing.T) { + time.Sleep(time.Millisecond * 10) // to ensure free socket + ctx := context.Background() + + ls, err := net.Listen("tcp", "localhost:9007") + if assert.NoError(t, err) { + defer func() { + err3 := ls.Close() + if err3 != nil { + t.Errorf("error closing the listener: error %v", err3) + } + }() + } else { + t.Skip("socket is busy") + } + + cmd := exec.Command("php", "tests/failboot.php") + + w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + assert.Nil(t, w) + assert.Error(t, err2) + assert.Contains(t, err2.Error(), "failboot") +} func Test_Tcp_Timeout(t *testing.T) { time.Sleep(time.Millisecond * 10) // to ensure free socket @@ -161,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -208,7 +209,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -248,7 +249,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -304,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -393,7 +394,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -436,7 +437,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := sw.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -512,7 +513,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -580,7 +581,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/static_pool.go b/static_pool.go index 0c2352ad..4ecbdd41 100755 --- a/static_pool.go +++ b/static_pool.go @@ -6,20 +6,19 @@ import ( "os/exec" "sync" + "github.com/spiral/roadrunner/v2/util" + "github.com/pkg/errors" ) -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) +// StopRequest can be sent by worker to indicate that restart is required. +const StopRequest = "{\"stop\":true}" var bCtx = context.Background() // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { - // pool behaviour - cfg *Config + cfg Config // worker command creator cmd func() *exec.Cmd @@ -27,30 +26,31 @@ type StaticPool struct { // creates and connects to stack factory Factory + // distributes the events + events *util.EventHandler + // protects state of worker list, does not affect allocation muw sync.RWMutex - ww *WorkersWatcher + // manages worker states and TTLs + ww *workerWatcher - events chan PoolEvent -} -type PoolEvent struct { - Payload interface{} + // supervises memory and TTL of workers + // sp *supervisedPool } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -// TODO why cfg is passed by pointer? -func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { +func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() p := &StaticPool{ cfg: cfg, cmd: cmd, factory: factory, - events: make(chan PoolEvent), + events: &util.EventHandler{}, } - p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { + p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) { w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) if err != nil { return nil, err @@ -74,12 +74,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co return nil, err } + // todo: implement + // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor) + // p.sp.Start() + return p, nil } +// AddListener connects event listener to the pool. +func (p *StaticPool) AddListener(listener util.EventListener) { + p.events.AddListener(listener) +} + // Config returns associated pool configuration. Immutable. -func (p *StaticPool) Config() Config { - return *p.cfg +func (p *StaticPool) GetConfig() Config { + return p.cfg } // Workers returns worker list associated with the pool. @@ -103,18 +112,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { rsp, err := sw.Exec(rqs) if err != nil { - errJ := p.checkMaxJobs(bCtx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) + if _, jobError := err.(JobError); jobError { + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err := p.ww.AllocateNew(bCtx) + if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + } + + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + } + } else { + p.ww.PushWorker(w) + } + return EmptyPayload, err } sw.State().Set(StateInvalid) + p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) + if errS != nil { return EmptyPayload, fmt.Errorf("%v, %v", err, errS) } @@ -127,9 +148,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - return EmptyPayload, err + p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.ExecWithContext(bCtx, rqs) + + return p.Exec(rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { @@ -146,81 +168,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { } // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - // todo: why TODO passed here? - getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) - defer cancel() - w, err := p.ww.GetFreeWorker(getWorkerCtx) - if err != nil && errors.Is(err, ErrWatcherStopped) { - return EmptyPayload, ErrWatcherStopped - } else if err != nil { - return EmptyPayload, err - } - - sw := w.(SyncWorker) - - var execCtx context.Context - if p.cfg.ExecTTL != 0 { - var cancel2 context.CancelFunc - execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() - } else { - execCtx = ctx - } - - rsp, err := sw.ExecWithContext(execCtx, rqs) - if err != nil { - errJ := p.checkMaxJobs(ctx, w) - if errJ != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) - } - // soft job errors are allowed - if _, jobError := err.(TaskError); jobError { - p.ww.PushWorker(w) - return EmptyPayload, err - } - - sw.State().Set(StateInvalid) - errS := w.Stop(ctx) - if errS != nil { - return EmptyPayload, fmt.Errorf("%v, %v", err, errS) - } - - return EmptyPayload, err - } - - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - w.State().Set(StateInvalid) - err = w.Stop(ctx) - if err != nil { - return EmptyPayload, err - } - return p.ExecWithContext(ctx, rqs) - } - - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(ctx) - if err != nil { - return EmptyPayload, err - } - } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() - } - return rsp, nil -} +// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +// // todo: why TODO passed here? +// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) +// defer cancel() +// w, err := p.ww.GetFreeWorker(getWorkerCtx) +// if err != nil && errors.Is(err, ErrWatcherStopped) { +// return EmptyPayload, ErrWatcherStopped +// } else if err != nil { +// return EmptyPayload, err +// } +// +// sw := w.(SyncWorker) +// +// // todo: implement worker destroy +// //execCtx context.Context +// //if p.cfg.Supervisor.ExecTTL != 0 { +// // var cancel2 context.CancelFunc +// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL) +// // defer cancel2() +// //} else { +// // execCtx = ctx +// //} +// +// rsp, err := sw.Exec(rqs) +// if err != nil { +// errJ := p.checkMaxJobs(ctx, w) +// if errJ != nil { +// // todo: worker was not destroyed +// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) +// } +// +// // soft job errors are allowed +// if _, jobError := err.(JobError); jobError { +// p.ww.PushWorker(w) +// return EmptyPayload, err +// } +// +// sw.State().Set(StateInvalid) +// errS := w.Stop(ctx) +// if errS != nil { +// return EmptyPayload, fmt.Errorf("%v, %v", err, errS) +// } +// +// return EmptyPayload, err +// } +// +// // worker want's to be terminated +// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { +// w.State().Set(StateInvalid) +// err = w.Stop(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// return p.ExecWithContext(ctx, rqs) +// } +// +// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { +// err = p.ww.AllocateNew(ctx) +// if err != nil { +// return EmptyPayload, err +// } +// } else { +// p.muw.Lock() +// p.ww.PushWorker(w) +// p.muw.Unlock() +// } +// +// return rsp, nil +// } // Destroy all underlying stack (but let them to complete the task). func (p *StaticPool) Destroy(ctx context.Context) { p.ww.Destroy(ctx) } -func (p *StaticPool) Events() chan PoolEvent { - return p.events -} - // allocate required number of stack func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase @@ -243,6 +265,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { err := p.ww.AllocateNew(ctx) if err != nil { + p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } diff --git a/static_pool_test.go b/static_pool_test.go index ce9e6820..ec80e92a 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "log" "os/exec" "runtime" @@ -18,7 +17,6 @@ var cfg = Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, } func Test_NewPool(t *testing.T) { @@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) - assert.Equal(t, cfg, p.Config()) - defer p.Destroy(ctx) assert.NotNil(t, p) @@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), - &cfg, + cfg, ) assert.Nil(t, p) @@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) @@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -143,20 +139,20 @@ func Test_StaticPool_JobError(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -167,7 +163,7 @@ func Test_StaticPool_JobError(t *testing.T) { // ctx, // func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, // NewPipeFactory(), -// &cfg, +// cfg, // ) // assert.NoError(t, err) // assert.NotNil(t, p) @@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) { // var i int64 // atomic.StoreInt64(&i, 10) // +// p.AddListener(func(event interface{}) { +// +// }) +// // go func() { // for { // select { @@ -197,7 +197,7 @@ func Test_StaticPool_JobError(t *testing.T) { // wg.Wait() // // p.Destroy(ctx) -//} +// } // func Test_StaticPool_Broken_FromOutside(t *testing.T) { @@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) assert.NoError(t, err) defer p.Destroy(ctx) assert.NotNil(t, p) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { // Consume pool events wg := sync.WaitGroup{} wg.Add(1) - go func() { - for true { - select { - case ev := <-p.Events(): - fmt.Println(ev) - if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct { - wg.Done() - } + p.AddListener(func(event interface{}) { + if pe, ok := event.(PoolEvent); ok { + if pe.Event == EventWorkerConstruct { + wg.Done() } } - }() + }) // killing random worker and expecting pool to replace it err = p.Workers()[0].Kill(ctx) @@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, DestroyTimeout: time.Second * 2, - ExecTTL: time.Second * 4, }, ) assert.Error(t, err) @@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) assert.NoError(t, err) @@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 15, }, ) assert.NoError(t, err) @@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { assert.NotNil(t, p) - go func() { - for { - select { - case ev := <-p.Events(): - fmt.Println(ev) - } - } - }() - var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 4, }, ) @@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err := p.Exec(Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) + _, err = p.Exec(Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) assert.NoError(t, err) @@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(StateErrored) } - _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = p.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 5, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) @@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &cfg, + cfg, ) if err != nil { b.Fatal(err) @@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { ctx, func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), - &Config{ + Config{ NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, DestroyTimeout: time.Second, - ExecTTL: time.Second * 5, }, ) defer p.Destroy(ctx) @@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/supervisor_pool.go b/supervisor_pool.go new file mode 100755 index 00000000..9d1d2b1e --- /dev/null +++ b/supervisor_pool.go @@ -0,0 +1,130 @@ +package roadrunner + +import ( + "context" + "time" + + "github.com/spiral/roadrunner/v2/util" +) + +const MB = 1024 * 1024 + +type SupervisedPool interface { + Pool + + // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context + // deadline reached. + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) +} + +type supervisedPool struct { + cfg SupervisorConfig + events *util.EventHandler + pool Pool + stopCh chan struct{} +} + +func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool { + return &supervisedPool{ + cfg: cfg, + events: events, + pool: pool, + stopCh: make(chan struct{}), + } +} + +func (sp *supervisedPool) Start() { + go func() { + watchTout := time.NewTicker(sp.cfg.WatchTick) + for { + select { + case <-sp.stopCh: + watchTout.Stop() + return + // stop here + case <-watchTout.C: + sp.control() + } + } + }() +} + +func (sp *supervisedPool) Stop() { + sp.stopCh <- struct{}{} +} + +func (sp *supervisedPool) control() { + now := time.Now() + ctx := context.TODO() + + // THIS IS A COPY OF WORKERS + workers := sp.pool.Workers() + + for i := 0; i < len(workers); i++ { + if workers[i].State().Value() == StateInvalid { + continue + } + + s, err := WorkerProcessState(workers[i]) + if err != nil { + // worker not longer valid for supervision + continue + } + + if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]}) + } + + continue + } + + // firs we check maxWorker idle + if sp.cfg.IdleTTL != 0 { + // then check for the worker state + if workers[i].State().Value() != StateReady { + continue + } + + /* + Calculate idle time + If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64 + 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle + we are guessing that worker overlap idle time and has to be killed + */ + + // get last used unix nano + lu := workers[i].State().LastUsed() + + // convert last used to unixNano and sub time.now + res := int64(lu) - now.UnixNano() + + // maxWorkerIdle more than diff between now and last used + if sp.cfg.IdleTTL-res <= 0 { + err = sp.pool.RemoveWorker(ctx, workers[i]) + if err != nil { + sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err}) + return + } else { + sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]}) + } + } + } + } +} diff --git a/sync_worker.go b/sync_worker.go index de9491d6..d7c15e88 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/spiral/roadrunner/v2/util" + "github.com/pkg/errors" "github.com/spiral/goridge/v2" ) @@ -14,90 +16,24 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) - - // ExecWithContext allow to set ExecTTL - ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } -type taskWorker struct { +type syncWorker struct { w WorkerBase } func NewSyncWorker(w WorkerBase) (SyncWorker, error) { - return &taskWorker{ + return &syncWorker{ w: w, }, nil } -type twexec struct { - payload Payload - err error -} - -func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - c := make(chan twexec) - go func() { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("payload can not be empty"), - } - return - } - - if tw.w.State().Value() != StateReady { - c <- twexec{ - payload: EmptyPayload, - err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()), - } - return - } - - // set last used time - tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) - tw.w.State().Set(StateWorking) - - rsp, err := tw.execPayload(rqs) - if err != nil { - if _, ok := err.(TaskError); !ok { - tw.w.State().Set(StateErrored) - tw.w.State().RegisterExec() - } - c <- twexec{ - payload: EmptyPayload, - err: err, - } - return - } - - tw.w.State().Set(StateReady) - tw.w.State().RegisterExec() - c <- twexec{ - payload: rsp, - err: nil, - } - return - }() - - for { - select { - case <-ctx.Done(): - return EmptyPayload, ctx.Err() - case res := <-c: - if res.err != nil { - return EmptyPayload, res.err - } - - return res.payload, nil - } - } -} - -// -func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { - if len(rqs.Body) == 0 && len(rqs.Context) == 0 { +// Exec payload without TTL timeout. +func (tw *syncWorker) Exec(p Payload) (Payload, error) { + if len(p.Body) == 0 && len(p.Context) == 0 { return EmptyPayload, fmt.Errorf("payload can not be empty") } @@ -109,9 +45,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) tw.w.State().Set(StateWorking) - rsp, err := tw.execPayload(rqs) + rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(TaskError); !ok { + if _, ok := err.(JobError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -124,7 +60,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { +func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { return EmptyPayload, errors.Wrap(err, "header error") @@ -147,7 +83,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, TaskError(rsp.Context) + return EmptyPayload, JobError(rsp.Context) } // add streaming support :) @@ -158,46 +94,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { return rsp, nil } -func (tw *taskWorker) String() string { +func (tw *syncWorker) String() string { return tw.w.String() } -func (tw *taskWorker) Created() time.Time { - return tw.w.Created() +func (tw *syncWorker) Pid() int64 { + return tw.w.Pid() } -func (tw *taskWorker) Events() <-chan WorkerEvent { - return tw.w.Events() +func (tw *syncWorker) Created() time.Time { + return tw.w.Created() } -func (tw *taskWorker) Pid() int64 { - return tw.w.Pid() +func (tw *syncWorker) AddListener(listener util.EventListener) { + tw.w.AddListener(listener) } -func (tw *taskWorker) State() State { +func (tw *syncWorker) State() State { return tw.w.State() } -func (tw *taskWorker) Start() error { +func (tw *syncWorker) Start() error { return tw.w.Start() } -func (tw *taskWorker) Wait(ctx context.Context) error { +func (tw *syncWorker) Wait(ctx context.Context) error { return tw.w.Wait(ctx) } -func (tw *taskWorker) Stop(ctx context.Context) error { +func (tw *syncWorker) Stop(ctx context.Context) error { return tw.w.Stop(ctx) } -func (tw *taskWorker) Kill(ctx context.Context) error { +func (tw *syncWorker) Kill(ctx context.Context) error { return tw.w.Kill(ctx) } -func (tw *taskWorker) Relay() goridge.Relay { +func (tw *syncWorker) Relay() goridge.Relay { return tw.w.Relay() } -func (tw *taskWorker) AttachRelay(rl goridge.Relay) { +func (tw *syncWorker) AttachRelay(rl goridge.Relay) { tw.w.AttachRelay(rl) } diff --git a/sync_worker_test.go b/sync_worker_test.go index f4868009..7f969283 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -2,11 +2,9 @@ package roadrunner import ( "context" - "errors" "os/exec" "sync" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -34,7 +32,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -65,7 +63,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.ExecWithContext(ctx, EmptyPayload) + res, err := syncWorker.Exec(EmptyPayload) assert.Error(t, err) assert.Nil(t, res.Body) @@ -84,7 +82,6 @@ func Test_NotStarted_String(t *testing.T) { } func Test_NotStarted_Exec(t *testing.T) { - ctx := context.Background() cmd := exec.Command("php", "tests/client.php", "echo", "pipes") w, _ := InitBaseWorker(cmd) @@ -94,7 +91,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -143,7 +140,7 @@ func Test_Echo_Slow(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -164,28 +161,17 @@ func Test_Broken(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - go func() { - assert.NotNil(t, w) - tt := time.NewTimer(time.Second * 10) - defer wg.Done() - for { - select { - case ev := <-w.Events(): - assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()") - return - case <-tt.C: - assert.Error(t, errors.New("no events from worker")) - return - } - } - }() + w.AddListener(func(event interface{}) { + assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()") + wg.Done() + }) syncWorker, err := NewSyncWorker(w) if err != nil { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -215,12 +201,12 @@ func Test_Error(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.Exec(Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, TaskError{}, err) + assert.IsType(t, JobError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -244,19 +230,19 @@ func Test_NumExecs(t *testing.T) { t.Fatal(err) } - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.Exec(Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/util/events.go b/util/events.go new file mode 100755 index 00000000..9e12c4f7 --- /dev/null +++ b/util/events.go @@ -0,0 +1,26 @@ +package util + +// Event listener listens for the events produced by worker, worker pool or other servce. +type EventListener func(event interface{}) + +// EventHandler helps to broadcast events to multiple listeners. +type EventHandler struct { + listeners []EventListener +} + +// NumListeners returns number of event listeners. +func (eb *EventHandler) NumListeners() int { + return len(eb.listeners) +} + +// AddListener registers new event listener. +func (eb *EventHandler) AddListener(listener EventListener) { + eb.listeners = append(eb.listeners, listener) +} + +// Push broadcast events across all event listeners. +func (eb *EventHandler) Push(e interface{}) { + for _, listener := range eb.listeners { + listener(e) + } +} @@ -11,10 +11,18 @@ import ( "sync" "time" + "github.com/spiral/roadrunner/v2/util" + "github.com/spiral/goridge/v2" "go.uber.org/multierr" ) +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond +) + // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. @@ -22,38 +30,31 @@ const ( // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog - - // EventWorkerWaitDone triggered when worker exit from process Wait - EventWorkerWaitDone // todo: implemented? - - EventWorkerBufferClosed - - EventRelayCloseError - - EventWorkerProcessError ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 100 * time.Millisecond -) - -// todo: write comment +// WorkerEvent wraps worker events. type WorkerEvent struct { - Event int64 - Worker WorkerBase + // Event id, see below. + Event int64 + + // Worker triggered the event. + Worker WorkerBase + + // Event specific payload. Payload interface{} } type WorkerBase interface { fmt.Stringer - Created() time.Time + // Pid returns worker pid. + Pid() int64 - Events() <-chan WorkerEvent + // Created returns time worker was created at. + Created() time.Time - Pid() int64 + // AddListener attaches listener to consume worker events. + AddListener(listener util.EventListener) // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. @@ -88,7 +89,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events chan WorkerEvent + events *util.EventHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -129,7 +130,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { } w := &WorkerProcess{ created: time.Now(), - events: make(chan WorkerEvent, 10), + events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), } @@ -142,12 +143,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { return w, nil } +// Pid returns worker pid. +func (w *WorkerProcess) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. func (w *WorkerProcess) Created() time.Time { return w.created } -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) +// AddListener registers new worker event listener. +func (w *WorkerProcess) AddListener(listener util.EventListener) { + w.events.AddListener(listener) + + w.errBuffer.mu.Lock() + w.errBuffer.enable = true + w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -195,10 +207,6 @@ func (w *WorkerProcess) Start() error { return nil } -func (w *WorkerProcess) Events() <-chan WorkerEvent { - return w.events -} - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -208,15 +216,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error { w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer + + // if no errors in the events, error might be in the errBuffer if w.errBuffer.Len() > 0 { err = multierr.Append(err, errors.New(w.errBuffer.String())) } @@ -250,6 +251,7 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) + go func() { var err error w.errBuffer.Close() @@ -264,6 +266,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { w.state.Set(StateStopped) c <- nil }() + select { case <-ctx.Done(): return ctx.Err() @@ -290,16 +293,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error { } func (w *WorkerProcess) logCallback(log []byte) { - w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log} + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) } // thread safe errBuffer type errBuffer struct { - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo remove update + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo: remove update update chan interface{} stop chan interface{} logCallback func(log []byte) @@ -321,7 +325,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() - if len(eb.buf) > eb.last { + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.buf = eb.buf[0:0] eb.last = len(eb.buf) @@ -331,11 +335,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Stop() eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb == nil || eb.logCallback == nil { - eb.mu.Unlock() - return - } + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.last = len(eb.buf) } diff --git a/worker_test.go b/worker_test.go index a90b7ef2..d2744345 100755 --- a/worker_test.go +++ b/worker_test.go @@ -91,6 +91,7 @@ func TestErrBuffer_Write_Event(t *testing.T) { assert.Equal(t, []byte("hello\n"), log) wg.Done() } + buf.enable = true _, err := buf.Write([]byte("hello\n")) if err != nil { @@ -116,6 +117,8 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) { assert.Equal(t, []byte("hello\nending"), log) wg.Done() } + buf.enable = true + _, err := buf.Write([]byte("hel")) if err != nil { t.Errorf("fail to write: error %v", err) diff --git a/workers_watcher.go b/worker_watcher.go index d9d27196..25c88a1a 100755 --- a/workers_watcher.go +++ b/worker_watcher.go @@ -5,6 +5,8 @@ import ( "errors" "sync" "time" + + "github.com/spiral/roadrunner/v2/util" ) var ErrWatcherStopped = errors.New("watcher stopped") @@ -59,36 +61,36 @@ func (stack *Stack) Pop() (WorkerBase, bool) { return w, false } -type WorkersWatcher struct { - mutex sync.RWMutex - stack *Stack - allocator func(args ...interface{}) (WorkerBase, error) - initialNumWorkers int64 - actualNumWorkers int64 - events chan PoolEvent -} - type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(ctx context.Context, workers []WorkerBase) error + // GetFreeWorker provide first free worker GetFreeWorker(ctx context.Context) (WorkerBase, error) + // PutWorker enqueues worker back PushWorker(w WorkerBase) + // AllocateNew used to allocate new worker and put in into the WorkerWatcher AllocateNew(ctx context.Context) error + // Destroy destroys the underlying stack Destroy(ctx context.Context) + // WorkersList return all stack w/o removing it from internal storage WorkersList() []WorkerBase + // RemoveWorker remove worker from the stack RemoveWorker(ctx context.Context, wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced -func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher { - // todo check if events not nil - ww := &WorkersWatcher{ +func newWorkerWatcher( + allocator func(args ...interface{}) (WorkerBase, error), + numWorkers int64, + events *util.EventHandler, +) *workerWatcher { + ww := &workerWatcher{ stack: NewWorkersStack(), allocator: allocator, initialNumWorkers: numWorkers, @@ -99,14 +101,23 @@ func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n return ww } -func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { +type workerWatcher struct { + mutex sync.RWMutex + stack *Stack + allocator func(args ...interface{}) (WorkerBase, error) + initialNumWorkers int64 + actualNumWorkers int64 + events *util.EventHandler +} + +func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { for i := 0; i < len(workers); i++ { sw, err := NewSyncWorker(workers[i]) if err != nil { return err } ww.stack.Push(sw) - ww.watch(sw) + sw.AddListener(ww.events.Push) go func(swc WorkerBase) { ww.wait(ctx, swc) @@ -115,12 +126,13 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) return nil } -func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { +func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { // thread safe operation w, stop := ww.stack.Pop() if stop { return nil, ErrWatcherStopped } + // handle worker remove state // in this state worker is destroyed by supervisor if w != nil && w.State().Value() == StateRemove { @@ -131,6 +143,7 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // try to get next return ww.GetFreeWorker(ctx) } + // no free stack if w == nil { tout := time.NewTicker(time.Second * 180) @@ -152,23 +165,31 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } } } + ww.decreaseNumOfActualWorkers() return w, nil } -func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error { +func (ww *workerWatcher) AllocateNew(ctx context.Context) error { ww.stack.mutex.Lock() sw, err := ww.allocator() if err != nil { return err } + ww.addToWatch(sw) ww.stack.mutex.Unlock() ww.PushWorker(sw) + + ww.events.Push(PoolEvent{ + Event: EventWorkerConstruct, + Payload: sw, + }) + return nil } -func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { +func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { ww.stack.mutex.Lock() defer ww.stack.mutex.Unlock() pid := wb.Pid() @@ -193,19 +214,19 @@ func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error } // O(1) operation -func (ww *WorkersWatcher) PushWorker(w WorkerBase) { +func (ww *workerWatcher) PushWorker(w WorkerBase) { ww.mutex.Lock() ww.actualNumWorkers++ ww.mutex.Unlock() ww.stack.Push(w) } -func (ww *WorkersWatcher) ReduceWorkersCount() { +func (ww *workerWatcher) ReduceWorkersCount() { ww.decreaseNumOfActualWorkers() } // Destroy all underlying stack (but let them to complete the task) -func (ww *WorkersWatcher) Destroy(ctx context.Context) { +func (ww *workerWatcher) Destroy(ctx context.Context) { ww.stack.mutex.Lock() ww.stack.destroy = true ww.stack.mutex.Unlock() @@ -238,67 +259,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) { } // Warning, this is O(n) operation -func (ww *WorkersWatcher) WorkersList() []WorkerBase { +func (ww *workerWatcher) WorkersList() []WorkerBase { return ww.stack.workers } -func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) { +func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { err := w.Wait(ctx) if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ + ww.events.Push(WorkerEvent{ Event: EventWorkerError, Worker: w, Payload: err, - }} + }) } - // If not destroyed, reallocate - if w.State().Value() != StateDestroyed { - pid := w.Pid() - ww.stack.mutex.Lock() - for i := 0; i < len(ww.stack.workers); i++ { - // worker in the stack, reallocating - if ww.stack.workers[i].Pid() == pid { - ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) - ww.decreaseNumOfActualWorkers() - ww.stack.mutex.Unlock() - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} - return - } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} - return + + if w.State().Value() == StateDestroyed { + // worker was manually destroyed, no need to replace + return + } + + pid := w.Pid() + ww.stack.mutex.Lock() + for i := 0; i < len(ww.stack.workers); i++ { + // worker in the stack, reallocating + if ww.stack.workers[i].Pid() == pid { + ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...) + ww.decreaseNumOfActualWorkers() + ww.stack.mutex.Unlock() + + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) } - } - ww.stack.mutex.Unlock() - // worker not in the stack (not returned), forget and allocate new - err = ww.AllocateNew(ctx) - if err != nil { - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerError, - Worker: w, - Payload: err, - }} + return } - ww.events <- PoolEvent{Payload: WorkerEvent{ - Event: EventWorkerConstruct, - Worker: nil, - Payload: nil, - }} } + + ww.stack.mutex.Unlock() + + // worker not in the stack (not returned), forget and allocate new + err = ww.AllocateNew(ctx) + if err != nil { + ww.events.Push(PoolEvent{ + Event: EventPoolError, + Payload: err, + }) + return + } + return } -func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { +func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { @@ -306,18 +322,8 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) { }() } -func (ww *WorkersWatcher) decreaseNumOfActualWorkers() { +func (ww *workerWatcher) decreaseNumOfActualWorkers() { ww.mutex.Lock() ww.actualNumWorkers-- ww.mutex.Unlock() } - -func (ww *WorkersWatcher) watch(swc WorkerBase) { - // todo make event to stop function - go func() { - select { - case ev := <-swc.Events(): - ww.events <- PoolEvent{Payload: ev} - } - }() -} |