summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xerrors.go6
-rwxr-xr-xerrors/go.mod2
-rwxr-xr-xerrors_test.go2
-rwxr-xr-xgo.mod2
-rwxr-xr-xgo.sum19
-rwxr-xr-xpipe_factory_test.go8
-rwxr-xr-xplugins/config/provider.go2
-rwxr-xr-xplugins/config/tests/config_test.go2
-rwxr-xr-xplugins/config/viper.go3
-rwxr-xr-xplugins/events/broadcaster.go24
-rwxr-xr-xplugins/factory/app.go112
-rwxr-xr-xplugins/factory/app_provider.go17
-rwxr-xr-xplugins/factory/config.go37
-rwxr-xr-xplugins/factory/factory.go76
-rwxr-xr-xplugins/factory/hello.php1
-rwxr-xr-xplugins/factory/tests/factory_test.go7
-rwxr-xr-xplugins/factory/tests/plugin_1.go8
-rwxr-xr-xplugins/factory/tests/plugin_2.go24
-rwxr-xr-xplugins/rpc/config.go3
-rwxr-xr-xplugins/rpc/rpc.go82
-rwxr-xr-xplugins/rpc/rpc_test.go1
-rwxr-xr-xpool.go97
-rwxr-xr-xpool_supervisor.go182
-rwxr-xr-xsocket_factory.go9
-rwxr-xr-xsocket_factory_test.go61
-rwxr-xr-xstatic_pool.go211
-rwxr-xr-xstatic_pool_test.go114
-rwxr-xr-xsupervisor_pool.go130
-rwxr-xr-xsync_worker.go116
-rwxr-xr-xsync_worker_test.go42
-rwxr-xr-xutil/events.go26
-rwxr-xr-xworker.go100
-rwxr-xr-xworker_test.go3
-rwxr-xr-xworker_watcher.go (renamed from workers_watcher.go)158
34 files changed, 798 insertions, 889 deletions
diff --git a/errors.go b/errors.go
index b9746702..52356549 100755
--- a/errors.go
+++ b/errors.go
@@ -1,11 +1,11 @@
package roadrunner
-// TaskError is job level error (no WorkerProcess halt), wraps at top
+// JobError is job level error (no WorkerProcess halt), wraps at top
// of error context
-type TaskError []byte
+type JobError []byte
// Error converts error context to string
-func (te TaskError) Error() string {
+func (te JobError) Error() string {
return string(te)
}
diff --git a/errors/go.mod b/errors/go.mod
index 60dac691..1eaacc23 100755
--- a/errors/go.mod
+++ b/errors/go.mod
@@ -1 +1,3 @@
module github.com/48d90782/errors
+
+go 1.15
diff --git a/errors_test.go b/errors_test.go
index 69f1c9ec..75a86840 100755
--- a/errors_test.go
+++ b/errors_test.go
@@ -8,7 +8,7 @@ import (
)
func Test_JobError_Error(t *testing.T) {
- e := TaskError([]byte("error"))
+ e := JobError([]byte("error"))
assert.Equal(t, "error", e.Error())
}
diff --git a/go.mod b/go.mod
index ddf0fe98..5adc9293 100755
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/shirou/gopsutil v2.20.9+incompatible
github.com/spf13/viper v1.7.1
- github.com/spiral/endure v1.0.0-beta8
+ github.com/spiral/endure v1.0.0-beta10
github.com/spiral/goridge/v2 v2.4.5
github.com/stretchr/testify v1.6.1
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a
diff --git a/go.sum b/go.sum
index 72ca37a7..fef45035 100755
--- a/go.sum
+++ b/go.sum
@@ -35,6 +35,8 @@ github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
+github.com/corona10/goimagehash v1.0.2 h1:pUfB0LnsJASMPGEZLj7tGY251vF+qLGqOgEP4rUs6kA=
+github.com/corona10/goimagehash v1.0.2/go.mod h1:/l9umBhvcHQXVtQO1V6Gp1yD20STawkhRnnX0D1bvVI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -42,6 +44,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
+github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
+github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
@@ -52,8 +56,12 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/goccy/go-graphviz v0.0.8 h1:hYQikvj368s8+rmfzFOZeiCXvSocGH7rfAyLTOy/7AM=
+github.com/goccy/go-graphviz v0.0.8/go.mod h1:wXVsXxmyMQU6TN3zGRttjNn3h+iCAS7xQFC6TlNvLhk=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
+github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
@@ -100,6 +108,7 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
+github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@@ -141,6 +150,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
+github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
@@ -184,8 +195,8 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
-github.com/spiral/endure v1.0.0-beta8 h1:bKVe7F8CbvDZt8UYX3WoSV49OpFPHiM9Py55i7USPK8=
-github.com/spiral/endure v1.0.0-beta8/go.mod h1:EhC6CKaSQum/gz1zRqkyu4LqFOlngVTGbXK69pebmxQ=
+github.com/spiral/endure v1.0.0-beta10 h1:eAFnJspvmMRDL3u7iwWK8eewogMhq4TTG3CjCBPnbeI=
+github.com/spiral/endure v1.0.0-beta10/go.mod h1:mXFf8zPqr1SJ1cG0Zf59f6X+MvJzrdIwVjzQpa107e0=
github.com/spiral/goridge/v2 v2.4.5 h1:rg4lLEJLrEh1Wj6G1qTsYVbYiQvig6mOR1F9GyDIGm8=
github.com/spiral/goridge/v2 v2.4.5/go.mod h1:C/EZKFPON9lypi8QO7I5ObgVmrIzTmhZqFz/tmypcGc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -221,6 +232,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -228,6 +240,9 @@ golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm0
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/image v0.0.0-20200119044424-58c23975cae1/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
+golang.org/x/image v0.0.0-20200927104501-e162460cd6b5 h1:QelT11PB4FXiDEXucrfNckHoFxwt8USGY1ajP1ZF5lM=
+golang.org/x/image v0.0.0-20200927104501-e162460cd6b5/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 95eededa..ee2510f3 100755
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -205,7 +205,7 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
index 580231fd..ac33b3de 100755
--- a/plugins/config/provider.go
+++ b/plugins/config/provider.go
@@ -1,7 +1,7 @@
package config
type Provider interface {
- // Unmarshal configuration section into configuration object.
+ // UnmarshalKey reads configuration section into configuration object.
//
// func (h *HttpService) Init(cp config.Provider) error {
// h.config := &HttpConfig{}
diff --git a/plugins/config/tests/config_test.go b/plugins/config/tests/config_test.go
index c85a841f..14e60ac2 100755
--- a/plugins/config/tests/config_test.go
+++ b/plugins/config/tests/config_test.go
@@ -48,7 +48,7 @@ func TestViperProvider_Init(t *testing.T) {
for {
select {
case e := <-errCh:
- assert.NoError(t, e.Error.Err)
+ assert.NoError(t, e.Error)
assert.NoError(t, container.Stop())
return
case <-c:
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index 0c34313c..4e85af6b 100755
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -14,6 +14,7 @@ type ViperProvider struct {
Prefix string
}
+// Inits config provider.
func (v *ViperProvider) Init() error {
v.viper = viper.New()
@@ -49,7 +50,7 @@ func (v *ViperProvider) Overwrite(values map[string]string) error {
return nil
}
-//
+// UnmarshalKey reads configuration section into configuration object.
func (v *ViperProvider) UnmarshalKey(name string, out interface{}) error {
err := v.viper.UnmarshalKey(name, &out)
if err != nil {
diff --git a/plugins/events/broadcaster.go b/plugins/events/broadcaster.go
deleted file mode 100755
index 778b307d..00000000
--- a/plugins/events/broadcaster.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package events
-
-type EventListener interface {
- Handle(event interface{})
-}
-
-type EventBroadcaster struct {
- listeners []EventListener
-}
-
-func NewEventBroadcaster() *EventBroadcaster {
- return &EventBroadcaster{}
-}
-
-func (eb *EventBroadcaster) AddListener(l EventListener) {
- // todo: threadcase
- eb.listeners = append(eb.listeners, l)
-}
-
-func (eb *EventBroadcaster) Push(e interface{}) {
- for _, l := range eb.listeners {
- l.Handle(e)
- }
-}
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index e4002963..4951e3df 100755
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,58 +1,76 @@
package factory
import (
- "errors"
+ "context"
"fmt"
+ "log"
"os"
"os/exec"
"strings"
- "time"
+ "github.com/fatih/color"
+ "github.com/spiral/endure/errors"
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
-// AppConfig config combines factory, pool and cmd configurations.
-type AppConfig struct {
- Command string
- User string
- Group string
- Env Env
-
- // Listen defines connection method and factory to be used to connect to workers:
- // "pipes", "tcp://:6001", "unix://rr.sock"
- // This config section must not change on re-configuration.
- Relay string
-
- // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
- // must not change on re-configuration.
- RelayTimeout time.Duration
+const ServiceName = "app"
+
+type Env map[string]string
+
+// AppFactory creates workers for the application.
+type AppFactory interface {
+ NewCmdFactory(env Env) (func() *exec.Cmd, error)
+ NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
+ NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
}
+// App manages worker
type App struct {
- cfg AppConfig
- configProvider config.Provider
+ cfg Config
+ factory roadrunner.Factory
}
-func (app *App) Init(provider config.Provider) error {
- app.cfg = AppConfig{}
- app.configProvider = provider
-
- err := app.configProvider.UnmarshalKey("app", &app.cfg)
+// Init application provider.
+func (app *App) Init(cfg config.Provider) error {
+ err := cfg.UnmarshalKey(ServiceName, &app.cfg)
if err != nil {
return err
}
+ app.cfg.InitDefaults()
+
+ return nil
+}
+
+// Name contains service name.
+func (app *App) Name() string {
+ return ServiceName
+}
+
+func (app *App) Serve() chan error {
+ errCh := make(chan error, 1)
+ var err error
+
+ app.factory, err = app.initFactory()
+ if err != nil {
+ errCh <- errors.E(errors.Op("init factory"), err)
+ }
+
+ return errCh
+}
- if app.cfg.Relay == "" {
- app.cfg.Relay = "pipes"
+func (app *App) Stop() error {
+ if app.factory == nil {
+ return nil
}
- return nil
+ return app.factory.Close(context.Background())
}
-func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
+func (app *App) NewCmdFactory(env Env) (func() *exec.Cmd, error) {
var cmdArgs []string
+
// create command according to the config
cmdArgs = append(cmdArgs, strings.Split(app.cfg.Command, " ")...)
@@ -75,15 +93,45 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
}, nil
}
-// todo ENV unused
-func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
+func (app *App) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ return app.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+}
+
+func (app *App) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+ spawnCmd, err := app.NewCmdFactory(env)
+ if err != nil {
+ return nil, err
+ }
+
+ p, err := roadrunner.NewPool(ctx, spawnCmd, app.factory, opt)
+ if err != nil {
+ return nil, err
+ }
+
+ p.AddListener(func(event interface{}) {
+ if we, ok := event.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
+ })
+
+ return p, nil
+}
+
+func (app *App) initFactory() (roadrunner.Factory, error) {
if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
return roadrunner.NewPipeFactory(), nil
}
dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
lsn, err := util.CreateListener(app.cfg.Relay)
@@ -98,7 +146,7 @@ func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
default:
- return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
+ return nil, errors.E(errors.Str("invalid DSN (tcp://:6001, unix://file.sock)"))
}
}
diff --git a/plugins/factory/app_provider.go b/plugins/factory/app_provider.go
deleted file mode 100755
index 024c5bea..00000000
--- a/plugins/factory/app_provider.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package factory
-
-import (
- "os/exec"
-
- "github.com/spiral/roadrunner/v2"
-)
-
-type Env map[string]string
-
-type Spawner interface {
- // CmdFactory create new command factory with given env variables.
- NewCmd(env Env) (func() *exec.Cmd, error)
-
- // NewFactory inits new factory for workers.
- NewFactory(env Env) (roadrunner.Factory, error)
-}
diff --git a/plugins/factory/config.go b/plugins/factory/config.go
new file mode 100755
index 00000000..b2d1d0ad
--- /dev/null
+++ b/plugins/factory/config.go
@@ -0,0 +1,37 @@
+package factory
+
+import "time"
+
+// Config config combines factory, pool and cmd configurations.
+type Config struct {
+ // Command to run as application.
+ Command string
+
+ // User to run application under.
+ User string
+
+ // Group to run application under.
+ Group string
+
+ // Env represents application environment.
+ Env Env
+
+ // Listen defines connection method and factory to be used to connect to workers:
+ // "pipes", "tcp://:6001", "unix://rr.sock"
+ // This config section must not change on re-configuration.
+ Relay string
+
+ // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
+ // must not change on re-configuration. Defaults to 60s.
+ RelayTimeout time.Duration
+}
+
+func (cfg *Config) InitDefaults() {
+ if cfg.Relay == "" {
+ cfg.Relay = "pipes"
+ }
+
+ if cfg.RelayTimeout == 0 {
+ cfg.RelayTimeout = time.Second * 60
+ }
+}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
deleted file mode 100755
index 5d80682d..00000000
--- a/plugins/factory/factory.go
+++ /dev/null
@@ -1,76 +0,0 @@
-package factory
-
-import (
- "context"
-
- "log"
-
- "github.com/fatih/color"
- "github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/events"
-)
-
-type WorkerFactory interface {
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error)
-}
-
-type WFactory struct {
- spw Spawner
- eb *events.EventBroadcaster
-}
-
-func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, env Env) (roadrunner.Pool, error) {
- cmd, err := wf.spw.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- factory, err := wf.spw.NewFactory(env)
- if err != nil {
- return nil, err
- }
-
- p, err := roadrunner.NewPool(ctx, cmd, factory, opt)
- if err != nil {
- return nil, err
- }
-
- // TODO event to stop
- go func() {
- for e := range p.Events() {
- wf.eb.Push(e)
- if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
- if we.Event == roadrunner.EventWorkerLog {
- log.Print(color.YellowString(string(we.Payload.([]byte))))
- }
- }
- }
- }()
-
- return p, nil
-}
-
-func (wf *WFactory) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
- cmd, err := wf.spw.NewCmd(env)
- if err != nil {
- return nil, err
- }
-
- wb, err := roadrunner.InitBaseWorker(cmd())
- if err != nil {
- return nil, err
- }
-
- return wb, nil
-}
-
-func (wf *WFactory) Init(app Spawner) error {
- wf.spw = app
- wf.eb = events.NewEventBroadcaster()
- return nil
-}
-
-func (wf *WFactory) AddListener(l events.EventListener) {
- wf.eb.AddListener(l)
-}
diff --git a/plugins/factory/hello.php b/plugins/factory/hello.php
deleted file mode 100755
index c6199449..00000000
--- a/plugins/factory/hello.php
+++ /dev/null
@@ -1 +0,0 @@
-<?php echo "hello -" . time(); \ No newline at end of file
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 5347083a..6c264fd6 100755
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -31,11 +31,6 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&factory.WFactory{})
- if err != nil {
- t.Fatal(err)
- }
-
err = container.Register(&Foo{})
if err != nil {
t.Fatal(err)
@@ -65,7 +60,7 @@ func TestFactory(t *testing.T) {
for {
select {
case e := <-errCh:
- assert.NoError(t, e.Error.Err)
+ assert.NoError(t, e.Error)
assert.NoError(t, container.Stop())
return
case <-c:
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 5ab6df73..9011bb00 100755
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -10,10 +10,10 @@ import (
type Foo struct {
configProvider config.Provider
- spawner factory.Spawner
+ spawner factory.AppFactory
}
-func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
+func (f *Foo) Init(p config.Provider, spw factory.AppFactory) error {
f.configProvider = p
f.spawner = spw
return nil
@@ -22,14 +22,14 @@ func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spawner.NewCmd(nil)
+ cmd, err := f.spawner.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/factory/tests/plugin_2.go b/plugins/factory/tests/plugin_2.go
index 2409627e..9f401bec 100755
--- a/plugins/factory/tests/plugin_2.go
+++ b/plugins/factory/tests/plugin_2.go
@@ -13,28 +13,26 @@ import (
type Foo2 struct {
configProvider config.Provider
- wf factory.WorkerFactory
- spw factory.Spawner
+ wf factory.AppFactory
}
-func (f *Foo2) Init(p config.Provider, workerFactory factory.WorkerFactory, spawner factory.Spawner) error {
+func (f *Foo2) Init(p config.Provider, workerFactory factory.AppFactory) error {
f.configProvider = p
f.wf = workerFactory
- f.spw = spawner
return nil
}
func (f *Foo2) Serve() chan error {
errCh := make(chan error, 1)
- r := &factory.AppConfig{}
+ r := &factory.Config{}
err := f.configProvider.UnmarshalKey("app", r)
if err != nil {
errCh <- err
return errCh
}
- cmd, err := f.spw.NewCmd(nil)
+ cmd, err := f.wf.NewCmdFactory(nil)
if err != nil {
errCh <- err
return errCh
@@ -58,16 +56,18 @@ func (f *Foo2) Serve() chan error {
_ = w
- poolConfig := &roadrunner.Config{
+ poolConfig := roadrunner.Config{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
DestroyTimeout: time.Second * 10,
- TTL: 1000,
- IdleTTL: 1000,
- ExecTTL: time.Second * 10,
- MaxPoolMemory: 10000,
- MaxWorkerMemory: 10000,
+ Supervisor: roadrunner.SupervisorConfig{
+ WatchTick: 60,
+ TTL: 1000,
+ IdleTTL: 10,
+ ExecTTL: time.Second * 10,
+ MaxWorkerMemory: 1000,
+ },
}
pool, err := f.wf.NewWorkerPool(context.Background(), poolConfig, nil)
diff --git a/plugins/rpc/config.go b/plugins/rpc/config.go
index 1039ee5e..719fd5e3 100755
--- a/plugins/rpc/config.go
+++ b/plugins/rpc/config.go
@@ -12,6 +12,9 @@ import (
type Config struct {
// Listen string
Listen string
+
+ // Disabled disables RPC service.
+ Disabled bool
}
// InitDefaults allows to init blank config with pre-defined set of default values.
diff --git a/plugins/rpc/rpc.go b/plugins/rpc/rpc.go
index 6568eea3..0f6c9753 100755
--- a/plugins/rpc/rpc.go
+++ b/plugins/rpc/rpc.go
@@ -1,21 +1,24 @@
package rpc
import (
- "errors"
+ "net/rpc"
+ "github.com/spiral/endure"
+ "github.com/spiral/endure/errors"
"github.com/spiral/goridge/v2"
"github.com/spiral/roadrunner/v2/plugins/config"
-
- "net/rpc"
)
-type PluginRpc interface {
- Name() string
- RpcService() (interface{}, error)
+// RPCPluggable declares the ability to create set of public RPC methods.
+type RPCPluggable interface {
+ endure.Named
+
+ // Provides RPC methods for the given service.
+ RPCService() (interface{}, error)
}
-// ID contains default service name.
-const ID = "rpc"
+// ServiceName contains default service name.
+const ServiceName = "rpc"
type services struct {
service interface{}
@@ -24,52 +27,48 @@ type services struct {
// Service is RPC service.
type Service struct {
- // TODO do we need a pointer here since all receivers are pointers??
- rpc *rpc.Server
- configProvider config.Provider
- services []services
- config Config
- close chan struct{}
+ rpc *rpc.Server
+ services []services
+ config Config
+ close chan struct{}
}
// Init rpc service. Must return true if service is enabled.
func (s *Service) Init(cfg config.Provider) error {
- s.configProvider = cfg
- err := s.configProvider.UnmarshalKey(ID, &s.config)
+ if !cfg.Has(ServiceName) {
+ return errors.E(errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(ServiceName, &s.config)
if err != nil {
return err
}
+ s.config.InitDefaults()
- // TODO Do we need to init defaults
- if s.config.Listen == "" {
- s.config.InitDefaults()
+ if s.config.Disabled {
+ return errors.E(errors.Disabled)
}
- s.close = make(chan struct{})
+ return s.config.Valid()
+}
- return nil
+// Name contains service name.
+func (s *Service) Name() string {
+ return ServiceName
}
// Serve serves the service.
func (s *Service) Serve() chan error {
+ s.close = make(chan struct{}, 1)
errCh := make(chan error, 1)
- server := rpc.NewServer()
- if server == nil {
- errCh <- errors.New("rpc server is nil")
- return errCh
- }
- s.rpc = server
- if len(s.services) == 0 {
- errCh <- errors.New("no services with RPC")
- return errCh
- }
+ s.rpc = rpc.NewServer()
// Attach all services
for i := 0; i < len(s.services); i++ {
err := s.Register(s.services[i].name, s.services[i].service)
if err != nil {
- errCh <- err
+ errCh <- errors.E(errors.Op("register service"), err)
return errCh
}
}
@@ -85,7 +84,10 @@ func (s *Service) Serve() chan error {
select {
case <-s.close:
// log error
- errCh <- ln.Close()
+ err := ln.Close()
+ if err != nil {
+ errCh <- errors.E(errors.Op("close RPC socket"), err)
+ }
return
default:
conn, err := ln.Accept()
@@ -98,7 +100,7 @@ func (s *Service) Serve() chan error {
}
}()
- return nil
+ return errCh
}
// Stop stops the service.
@@ -109,12 +111,12 @@ func (s *Service) Stop() error {
func (s *Service) Depends() []interface{} {
return []interface{}{
- s.RpcService,
+ s.RegisterService,
}
}
-func (s *Service) RpcService(p PluginRpc) error {
- service, err := p.RpcService()
+func (s *Service) RegisterService(p RPCPluggable) error {
+ service, err := p.RPCService()
if err != nil {
return err
}
@@ -136,7 +138,7 @@ func (s *Service) RpcService(p PluginRpc) error {
// no suitable methods. It also logs the error using package log.
func (s *Service) Register(name string, svc interface{}) error {
if s.rpc == nil {
- return errors.New("RPC service is not configured")
+ return errors.E("RPC service is not configured")
}
return s.rpc.RegisterName(name, svc)
@@ -144,10 +146,6 @@ func (s *Service) Register(name string, svc interface{}) error {
// Client creates new RPC client.
func (s *Service) Client() (*rpc.Client, error) {
- if s.configProvider == nil {
- return nil, errors.New("RPC service is not configured")
- }
-
conn, err := s.config.Dialer()
if err != nil {
return nil, err
diff --git a/plugins/rpc/rpc_test.go b/plugins/rpc/rpc_test.go
deleted file mode 100755
index 9ab1e3e8..00000000
--- a/plugins/rpc/rpc_test.go
+++ /dev/null
@@ -1 +0,0 @@
-package rpc
diff --git a/pool.go b/pool.go
index 343dedf6..bc57bcbd 100755
--- a/pool.go
+++ b/pool.go
@@ -4,49 +4,52 @@ import (
"context"
"runtime"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
+// PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
+type PoolEvent struct {
+ // Event type, see below.
+ Event int64
+
+ // Payload depends on event type, typically it's worker or error.
+ Payload interface{}
+}
+
const (
// EventWorkerConstruct thrown when new worker is spawned.
- EventWorkerConstruct = iota + 100
+ EventWorkerConstruct = iota + 7800
// EventWorkerDestruct thrown after worker destruction.
EventWorkerDestruct
- // EventWorkerKill thrown after worker is being forcefully killed.
- EventWorkerKill
-
- // EventWorkerError thrown any worker related even happen (passed with WorkerError)
- EventWorkerEvent
-
- // EventWorkerDead thrown when worker stops worker for any reason.
- EventWorkerDead
-
- // EventPoolError caused on pool wide errors
+ // EventPoolError caused on pool wide errors.
EventPoolError
-)
-const (
- // EventMaxMemory caused when worker consumes more memory than allowed.
- EventMaxMemory = iota + 8000
+ // EventSupervisorError triggered when supervisor can not complete work.
+ EventSupervisorError
- // EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
+ // todo: EventMaxMemory caused when worker consumes more memory than allowed.
+ EventMaxMemory
+
+ // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
EventTTL
- // EventIdleTTL triggered when worker spends too much time at rest.
+ // todo: EventIdleTTL triggered when worker spends too much time at rest.
EventIdleTTL
- // EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
+ // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
EventExecTTL
)
// Pool managed set of inner worker processes.
type Pool interface {
- // ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
- Events() chan PoolEvent
+ // AddListener connects event listener to the pool.
+ AddListener(listener util.EventListener)
- // Exec one task with given payload and context, returns result or error.
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+ // GetConfig returns pool configuration.
+ GetConfig() Config
// Exec
Exec(rqs Payload) (Payload, error)
@@ -54,18 +57,14 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
+ // Remove worker from the pool.
RemoveWorker(ctx context.Context, worker WorkerBase) error
- Config() Config
-
// Destroy all underlying stack (but let them to complete the task).
Destroy(ctx context.Context)
}
-// todo: merge with pool options
-
-// Config defines basic behaviour of worker creation and handling process.
-//
+// Configures the pool behaviour.
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
@@ -84,20 +83,8 @@ type Config struct {
// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration
- // TTL defines maximum time worker is allowed to live.
- TTL int64
-
- // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
- IdleTTL int64
-
- // ExecTTL defines maximum lifetime per job.
- ExecTTL time.Duration
-
- // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
- MaxPoolMemory uint64
-
- // MaxWorkerMemory limits memory per worker.
- MaxWorkerMemory uint64
+ // Supervision config to limit worker and pool memory usage.
+ Supervisor SupervisorConfig
}
// InitDefaults enables default config values.
@@ -113,4 +100,30 @@ func (cfg *Config) InitDefaults() {
if cfg.DestroyTimeout == 0 {
cfg.DestroyTimeout = time.Minute
}
+
+ cfg.Supervisor.InitDefaults()
+}
+
+type SupervisorConfig struct {
+ // WatchTick defines how often to check the state of worker.
+ WatchTick time.Duration
+
+ // TTL defines maximum time worker is allowed to live.
+ TTL int64
+
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
+ IdleTTL int64
+
+ // ExecTTL defines maximum lifetime per job.
+ ExecTTL time.Duration
+
+ // MaxWorkerMemory limits memory per worker.
+ MaxWorkerMemory uint64
+}
+
+// InitDefaults enables default config values.
+func (cfg *SupervisorConfig) InitDefaults() {
+ if cfg.WatchTick == 0 {
+ cfg.WatchTick = time.Second
+ }
}
diff --git a/pool_supervisor.go b/pool_supervisor.go
deleted file mode 100755
index c0a6ecd9..00000000
--- a/pool_supervisor.go
+++ /dev/null
@@ -1,182 +0,0 @@
-package roadrunner
-
-import (
- "context"
- "errors"
- "fmt"
- "time"
-)
-
-const MB = 1024 * 1024
-
-type Supervisor interface {
- Attach(pool Pool)
- StartWatching() error
- StopWatching()
- Detach()
-}
-
-type staticPoolSupervisor struct {
- // maxWorkerMemory in MB
- maxWorkerMemory uint64
- // maxPoolMemory in MB
- maxPoolMemory uint64
- // maxWorkerTTL in seconds
- maxWorkerTTL uint64
- // maxWorkerIdle in seconds
- maxWorkerIdle uint64
-
- // watchTimeout in seconds
- watchTimeout uint64
- stopCh chan struct{}
-
- pool Pool
-}
-
-/*
-The arguments are:
-maxWorkerMemory - maximum memory allowed for a single worker
-maxPoolMemory - maximum pool memory allowed for a pool of a workers
-maxTtl - maximum ttl for the worker after which it will be killed and replaced
-maxIdle - maximum time to live for the worker in Ready state
-watchTimeout - time between watching for the workers/pool status
-*/
-// TODO might be just wrap the pool and return ControlledPool with included Pool interface
-func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor {
- if maxWorkerMemory == 0 {
- // just set to a big number, 5GB
- maxPoolMemory = 5000 * MB
- }
-
- if watchTimeout == 0 {
- watchTimeout = 60
- }
-
- return &staticPoolSupervisor{
- maxWorkerMemory: maxWorkerMemory,
- maxPoolMemory: maxPoolMemory,
- maxWorkerTTL: maxTtl,
- maxWorkerIdle: maxIdle,
- watchTimeout: watchTimeout,
- stopCh: make(chan struct{}),
- }
-}
-
-func (sps *staticPoolSupervisor) Attach(pool Pool) {
- sps.pool = pool
-}
-
-func (sps *staticPoolSupervisor) StartWatching() error {
- go func() {
- watchTout := time.NewTicker(time.Second * time.Duration(sps.watchTimeout))
- for {
- select {
- case <-sps.stopCh:
- watchTout.Stop()
- return
- // stop here
- case <-watchTout.C:
- err := sps.control()
- if err != nil {
- sps.pool.Events() <- PoolEvent{Payload: err}
- }
- }
- }
- }()
- return nil
-}
-
-func (sps *staticPoolSupervisor) StopWatching() {
- sps.stopCh <- struct{}{}
-}
-
-func (sps *staticPoolSupervisor) Detach() {
-
-}
-
-func (sps *staticPoolSupervisor) control() error {
- if sps.pool == nil {
- return errors.New("pool should be attached")
- }
- now := time.Now()
- ctx := context.TODO()
-
- // THIS IS A COPY OF WORKERS
- workers := sps.pool.Workers()
- totalUsedMemory := uint64(0)
-
- for i := 0; i < len(workers); i++ {
- if workers[i].State().Value() == StateInvalid {
- continue
- }
-
- s, err := WorkerProcessState(workers[i])
- if err != nil {
- err2 := sps.pool.RemoveWorker(ctx, workers[i])
- if err2 != nil {
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)}
- return fmt.Errorf("worker process state error: %v, Remove worker error: %v", err, err2)
- }
- sps.pool.Events() <- PoolEvent{Payload: err}
- return err
- }
-
- if sps.maxWorkerTTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sps.maxWorkerTTL) {
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
-
- // after remove worker we should exclude it from further analysis
- workers = append(workers[:i], workers[i+1:]...)
- }
-
- if sps.maxWorkerMemory != 0 && s.MemoryUsage >= sps.maxWorkerMemory*MB {
- // TODO events
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed memory reached (%vMB)", sps.maxWorkerMemory)}
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
- workers = append(workers[:i], workers[i+1:]...)
- continue
- }
-
- // firs we check maxWorker idle
- if sps.maxWorkerIdle != 0 {
- // then check for the worker state
- if workers[i].State().Value() != StateReady {
- continue
- }
- /*
- Calculate idle time
- If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
- 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
- we are guessing that worker overlap idle time and has to be killed
- */
- // get last used unix nano
- lu := workers[i].State().LastUsed()
- // convert last used to unixNano and sub time.now
- res := int64(lu) - now.UnixNano()
- // maxWorkerIdle more than diff between now and last used
- if int64(sps.maxWorkerIdle)-res <= 0 {
- sps.pool.Events() <- PoolEvent{Payload: fmt.Errorf("max allowed worker idle time elapsed. actual idle time: %v, max idle time: %v", sps.maxWorkerIdle, res)}
- err = sps.pool.RemoveWorker(ctx, workers[i])
- if err != nil {
- return err
- }
- workers = append(workers[:i], workers[i+1:]...)
- }
- }
-
- // the very last step is to calculate pool memory usage (except excluded workers)
- totalUsedMemory += s.MemoryUsage
- }
-
- // if current usage more than max allowed pool memory usage
- if totalUsedMemory > sps.maxPoolMemory {
- sps.pool.Destroy(ctx)
- }
-
- return nil
-}
diff --git a/socket_factory.go b/socket_factory.go
index 27558cce..ed151f2d 100755
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -8,6 +8,8 @@ import (
"sync"
"time"
+ "github.com/shirou/gopsutil/process"
+
"github.com/pkg/errors"
"github.com/spiral/goridge/v2"
"go.uber.org/multierr"
@@ -110,6 +112,7 @@ func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cm
w.Kill(context.Background()),
w.Wait(context.Background()),
)
+
c <- socketSpawn{
w: nil,
err: err,
@@ -178,10 +181,16 @@ func (f *SocketFactory) Close(ctx context.Context) error {
// waits for WorkerProcess to connect over socket and returns associated relay of timeout
func (f *SocketFactory) findRelayWithContext(ctx context.Context, w WorkerBase) (*goridge.SocketRelay, error) {
+ ticker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
+ case <-ticker.C:
+ _, err := process.NewProcess(int32(w.Pid()))
+ if err != nil {
+ return nil, err
+ }
default:
tmp, ok := f.relays.Load(w.Pid())
if !ok {
diff --git a/socket_factory_test.go b/socket_factory_test.go
index cfb95ca1..6ab87872 100755
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -98,28 +98,29 @@ func Test_Tcp_StartError(t *testing.T) {
assert.Nil(t, w)
}
-// func Test_Tcp_Failboot(t *testing.T) {
-// time.Sleep(time.Millisecond * 10) // to ensure free socket
-//
-// ls, err := net.Listen("tcp", "localhost:9007")
-// if assert.NoError(t, err) {
-// defer func() {
-// err3 := ls.Close()
-// if err3 != nil {
-// t.Errorf("error closing the listener: error %v", err3)
-// }
-// }()
-// } else {
-// t.Skip("socket is busy")
-// }
-//
-// cmd := exec.Command("php", "tests/failboot.php")
-//
-// w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(cmd)
-// assert.Nil(t, w)
-// assert.Error(t, err2)
-// assert.Contains(t, err2.Error(), "failboot")
-//}
+func Test_Tcp_Failboot(t *testing.T) {
+ time.Sleep(time.Millisecond * 10) // to ensure free socket
+ ctx := context.Background()
+
+ ls, err := net.Listen("tcp", "localhost:9007")
+ if assert.NoError(t, err) {
+ defer func() {
+ err3 := ls.Close()
+ if err3 != nil {
+ t.Errorf("error closing the listener: error %v", err3)
+ }
+ }()
+ } else {
+ t.Skip("socket is busy")
+ }
+
+ cmd := exec.Command("php", "tests/failboot.php")
+
+ w, err2 := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ assert.Nil(t, w)
+ assert.Error(t, err2)
+ assert.Contains(t, err2.Error(), "failboot")
+}
func Test_Tcp_Timeout(t *testing.T) {
time.Sleep(time.Millisecond * 10) // to ensure free socket
@@ -161,7 +162,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -208,7 +209,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -248,7 +249,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -304,7 +305,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*2).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -393,7 +394,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -436,7 +437,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -512,7 +513,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -580,7 +581,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/static_pool.go b/static_pool.go
index 0c2352ad..4ecbdd41 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -6,20 +6,19 @@ import (
"os/exec"
"sync"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
)
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
+// StopRequest can be sent by worker to indicate that restart is required.
+const StopRequest = "{\"stop\":true}"
var bCtx = context.Background()
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
- // pool behaviour
- cfg *Config
+ cfg Config
// worker command creator
cmd func() *exec.Cmd
@@ -27,30 +26,31 @@ type StaticPool struct {
// creates and connects to stack
factory Factory
+ // distributes the events
+ events *util.EventHandler
+
// protects state of worker list, does not affect allocation
muw sync.RWMutex
- ww *WorkersWatcher
+ // manages worker states and TTLs
+ ww *workerWatcher
- events chan PoolEvent
-}
-type PoolEvent struct {
- Payload interface{}
+ // supervises memory and TTL of workers
+ // sp *supervisedPool
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-// TODO why cfg is passed by pointer?
-func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
+func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) {
cfg.InitDefaults()
p := &StaticPool{
cfg: cfg,
cmd: cmd,
factory: factory,
- events: make(chan PoolEvent),
+ events: &util.EventHandler{},
}
- p.ww = NewWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
+ p.ww = newWorkerWatcher(func(args ...interface{}) (WorkerBase, error) {
w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd())
if err != nil {
return nil, err
@@ -74,12 +74,21 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Co
return nil, err
}
+ // todo: implement
+ // p.sp = newPoolWatcher(p, p.events, p.cfg.Supervisor)
+ // p.sp.Start()
+
return p, nil
}
+// AddListener connects event listener to the pool.
+func (p *StaticPool) AddListener(listener util.EventListener) {
+ p.events.AddListener(listener)
+}
+
// Config returns associated pool configuration. Immutable.
-func (p *StaticPool) Config() Config {
- return *p.cfg
+func (p *StaticPool) GetConfig() Config {
+ return p.cfg
}
// Workers returns worker list associated with the pool.
@@ -103,18 +112,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
rsp, err := sw.Exec(rqs)
if err != nil {
- errJ := p.checkMaxJobs(bCtx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
// soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
+ if _, jobError := err.(JobError); jobError {
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err := p.ww.AllocateNew(bCtx)
+ if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ }
+
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ }
+ } else {
+ p.ww.PushWorker(w)
+ }
+
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
+ p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
+
if errS != nil {
return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
}
@@ -127,9 +148,10 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- return EmptyPayload, err
+ p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.ExecWithContext(bCtx, rqs)
+
+ return p.Exec(rqs)
}
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
@@ -146,81 +168,81 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
}
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- // todo: why TODO passed here?
- getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
- defer cancel()
- w, err := p.ww.GetFreeWorker(getWorkerCtx)
- if err != nil && errors.Is(err, ErrWatcherStopped) {
- return EmptyPayload, ErrWatcherStopped
- } else if err != nil {
- return EmptyPayload, err
- }
-
- sw := w.(SyncWorker)
-
- var execCtx context.Context
- if p.cfg.ExecTTL != 0 {
- var cancel2 context.CancelFunc
- execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
- defer cancel2()
- } else {
- execCtx = ctx
- }
-
- rsp, err := sw.ExecWithContext(execCtx, rqs)
- if err != nil {
- errJ := p.checkMaxJobs(ctx, w)
- if errJ != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
- }
- // soft job errors are allowed
- if _, jobError := err.(TaskError); jobError {
- p.ww.PushWorker(w)
- return EmptyPayload, err
- }
-
- sw.State().Set(StateInvalid)
- errS := w.Stop(ctx)
- if errS != nil {
- return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
- }
-
- return EmptyPayload, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(StateInvalid)
- err = w.Stop(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- return p.ExecWithContext(ctx, rqs)
- }
-
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(ctx)
- if err != nil {
- return EmptyPayload, err
- }
- } else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
- }
- return rsp, nil
-}
+// func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+// // todo: why TODO passed here?
+// getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
+// defer cancel()
+// w, err := p.ww.GetFreeWorker(getWorkerCtx)
+// if err != nil && errors.Is(err, ErrWatcherStopped) {
+// return EmptyPayload, ErrWatcherStopped
+// } else if err != nil {
+// return EmptyPayload, err
+// }
+//
+// sw := w.(SyncWorker)
+//
+// // todo: implement worker destroy
+// //execCtx context.Context
+// //if p.cfg.Supervisor.ExecTTL != 0 {
+// // var cancel2 context.CancelFunc
+// // execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.Supervisor.ExecTTL)
+// // defer cancel2()
+// //} else {
+// // execCtx = ctx
+// //}
+//
+// rsp, err := sw.Exec(rqs)
+// if err != nil {
+// errJ := p.checkMaxJobs(ctx, w)
+// if errJ != nil {
+// // todo: worker was not destroyed
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+// }
+//
+// // soft job errors are allowed
+// if _, jobError := err.(JobError); jobError {
+// p.ww.PushWorker(w)
+// return EmptyPayload, err
+// }
+//
+// sw.State().Set(StateInvalid)
+// errS := w.Stop(ctx)
+// if errS != nil {
+// return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+// }
+//
+// return EmptyPayload, err
+// }
+//
+// // worker want's to be terminated
+// if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+// w.State().Set(StateInvalid)
+// err = w.Stop(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// return p.ExecWithContext(ctx, rqs)
+// }
+//
+// if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+// err = p.ww.AllocateNew(ctx)
+// if err != nil {
+// return EmptyPayload, err
+// }
+// } else {
+// p.muw.Lock()
+// p.ww.PushWorker(w)
+// p.muw.Unlock()
+// }
+//
+// return rsp, nil
+// }
// Destroy all underlying stack (but let them to complete the task).
func (p *StaticPool) Destroy(ctx context.Context) {
p.ww.Destroy(ctx)
}
-func (p *StaticPool) Events() chan PoolEvent {
- return p.events
-}
-
// allocate required number of stack
func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) {
var workers []WorkerBase
@@ -243,6 +265,7 @@ func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
err := p.ww.AllocateNew(ctx)
if err != nil {
+ p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
return err
}
}
diff --git a/static_pool_test.go b/static_pool_test.go
index ce9e6820..ec80e92a 100755
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"log"
"os/exec"
"runtime"
@@ -18,7 +17,6 @@ var cfg = Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
}
func Test_NewPool(t *testing.T) {
@@ -27,12 +25,10 @@ func Test_NewPool(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
- assert.Equal(t, cfg, p.Config())
-
defer p.Destroy(ctx)
assert.NotNil(t, p)
@@ -43,7 +39,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.Nil(t, p)
@@ -55,7 +51,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
@@ -71,7 +67,7 @@ func Test_StaticPool_Echo(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -79,7 +75,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -95,7 +91,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -103,7 +99,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -119,7 +115,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
@@ -127,7 +123,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.Exec(Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -143,20 +139,20 @@ func Test_StaticPool_JobError(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, TaskError{}, err)
+ assert.IsType(t, JobError{}, err)
assert.Equal(t, "hello", err.Error())
}
@@ -167,7 +163,7 @@ func Test_StaticPool_JobError(t *testing.T) {
// ctx,
// func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
// NewPipeFactory(),
-// &cfg,
+// cfg,
// )
// assert.NoError(t, err)
// assert.NotNil(t, p)
@@ -177,6 +173,10 @@ func Test_StaticPool_JobError(t *testing.T) {
// var i int64
// atomic.StoreInt64(&i, 10)
//
+// p.AddListener(func(event interface{}) {
+//
+// })
+//
// go func() {
// for {
// select {
@@ -197,7 +197,7 @@ func Test_StaticPool_JobError(t *testing.T) {
// wg.Wait()
//
// p.Destroy(ctx)
-//}
+// }
//
func Test_StaticPool_Broken_FromOutside(t *testing.T) {
@@ -206,14 +206,14 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
assert.NoError(t, err)
defer p.Destroy(ctx)
assert.NotNil(t, p)
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -226,17 +226,13 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
// Consume pool events
wg := sync.WaitGroup{}
wg.Add(1)
- go func() {
- for true {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- if ev.Payload.(WorkerEvent).Event == EventWorkerConstruct {
- wg.Done()
- }
+ p.AddListener(func(event interface{}) {
+ if pe, ok := event.(PoolEvent); ok {
+ if pe.Event == EventWorkerConstruct {
+ wg.Done()
}
}
- }()
+ })
// killing random worker and expecting pool to replace it
err = p.Workers()[0].Kill(ctx)
@@ -258,11 +254,10 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
DestroyTimeout: time.Second * 2,
- ExecTTL: time.Second * 4,
},
)
assert.Error(t, err)
@@ -275,12 +270,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
assert.NoError(t, err)
@@ -291,11 +285,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, _ := p.Exec(Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -314,11 +308,10 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 15,
},
)
assert.NoError(t, err)
@@ -326,26 +319,17 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
assert.NotNil(t, p)
- go func() {
- for {
- select {
- case ev := <-p.Events():
- fmt.Println(ev)
- }
- }
- }()
-
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := p.Exec(Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -364,11 +348,10 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -376,7 +359,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -387,11 +370,10 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 4,
},
)
@@ -399,7 +381,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err := p.Exec(Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -407,7 +389,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
+ _, err = p.Exec(Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -418,11 +400,10 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
assert.NoError(t, err)
@@ -434,7 +415,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(StateErrored)
}
- _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = p.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -444,11 +425,10 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/slow-destroy.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
@@ -464,7 +444,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &cfg,
+ cfg,
)
if err != nil {
b.Fatal(err)
@@ -473,7 +453,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -486,11 +466,10 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -500,7 +479,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -517,12 +496,11 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
ctx,
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
- &Config{
+ Config{
NumWorkers: 1,
MaxJobs: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
- ExecTTL: time.Second * 5,
},
)
defer p.Destroy(ctx)
@@ -530,7 +508,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.Exec(Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/supervisor_pool.go b/supervisor_pool.go
new file mode 100755
index 00000000..9d1d2b1e
--- /dev/null
+++ b/supervisor_pool.go
@@ -0,0 +1,130 @@
+package roadrunner
+
+import (
+ "context"
+ "time"
+
+ "github.com/spiral/roadrunner/v2/util"
+)
+
+const MB = 1024 * 1024
+
+type SupervisedPool interface {
+ Pool
+
+ // ExecWithContext provides the ability to execute with time deadline. Attention, worker will be destroyed if context
+ // deadline reached.
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+}
+
+type supervisedPool struct {
+ cfg SupervisorConfig
+ events *util.EventHandler
+ pool Pool
+ stopCh chan struct{}
+}
+
+func newPoolWatcher(pool *StaticPool, events *util.EventHandler, cfg SupervisorConfig) *supervisedPool {
+ return &supervisedPool{
+ cfg: cfg,
+ events: events,
+ pool: pool,
+ stopCh: make(chan struct{}),
+ }
+}
+
+func (sp *supervisedPool) Start() {
+ go func() {
+ watchTout := time.NewTicker(sp.cfg.WatchTick)
+ for {
+ select {
+ case <-sp.stopCh:
+ watchTout.Stop()
+ return
+ // stop here
+ case <-watchTout.C:
+ sp.control()
+ }
+ }
+ }()
+}
+
+func (sp *supervisedPool) Stop() {
+ sp.stopCh <- struct{}{}
+}
+
+func (sp *supervisedPool) control() {
+ now := time.Now()
+ ctx := context.TODO()
+
+ // THIS IS A COPY OF WORKERS
+ workers := sp.pool.Workers()
+
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().Value() == StateInvalid {
+ continue
+ }
+
+ s, err := WorkerProcessState(workers[i])
+ if err != nil {
+ // worker not longer valid for supervision
+ continue
+ }
+
+ if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventTTL, Payload: workers[i]})
+ }
+
+ continue
+ }
+
+ // firs we check maxWorker idle
+ if sp.cfg.IdleTTL != 0 {
+ // then check for the worker state
+ if workers[i].State().Value() != StateReady {
+ continue
+ }
+
+ /*
+ Calculate idle time
+ If worker in the StateReady, we read it LastUsed timestamp as UnixNano uint64
+ 2. For example maxWorkerIdle is equal to 5sec, then, if (time.Now - LastUsed) > maxWorkerIdle
+ we are guessing that worker overlap idle time and has to be killed
+ */
+
+ // get last used unix nano
+ lu := workers[i].State().LastUsed()
+
+ // convert last used to unixNano and sub time.now
+ res := int64(lu) - now.UnixNano()
+
+ // maxWorkerIdle more than diff between now and last used
+ if sp.cfg.IdleTTL-res <= 0 {
+ err = sp.pool.RemoveWorker(ctx, workers[i])
+ if err != nil {
+ sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: err})
+ return
+ } else {
+ sp.events.Push(PoolEvent{Event: EventIdleTTL, Payload: workers[i]})
+ }
+ }
+ }
+ }
+}
diff --git a/sync_worker.go b/sync_worker.go
index de9491d6..d7c15e88 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -5,6 +5,8 @@ import (
"fmt"
"time"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/pkg/errors"
"github.com/spiral/goridge/v2"
)
@@ -14,90 +16,24 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
+
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
-
- // ExecWithContext allow to set ExecTTL
- ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
-type taskWorker struct {
+type syncWorker struct {
w WorkerBase
}
func NewSyncWorker(w WorkerBase) (SyncWorker, error) {
- return &taskWorker{
+ return &syncWorker{
w: w,
}, nil
}
-type twexec struct {
- payload Payload
- err error
-}
-
-func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
- c := make(chan twexec)
- go func() {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("payload can not be empty"),
- }
- return
- }
-
- if tw.w.State().Value() != StateReady {
- c <- twexec{
- payload: EmptyPayload,
- err: fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()),
- }
- return
- }
-
- // set last used time
- tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
- tw.w.State().Set(StateWorking)
-
- rsp, err := tw.execPayload(rqs)
- if err != nil {
- if _, ok := err.(TaskError); !ok {
- tw.w.State().Set(StateErrored)
- tw.w.State().RegisterExec()
- }
- c <- twexec{
- payload: EmptyPayload,
- err: err,
- }
- return
- }
-
- tw.w.State().Set(StateReady)
- tw.w.State().RegisterExec()
- c <- twexec{
- payload: rsp,
- err: nil,
- }
- return
- }()
-
- for {
- select {
- case <-ctx.Done():
- return EmptyPayload, ctx.Err()
- case res := <-c:
- if res.err != nil {
- return EmptyPayload, res.err
- }
-
- return res.payload, nil
- }
- }
-}
-
-//
-func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
- if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+// Exec payload without TTL timeout.
+func (tw *syncWorker) Exec(p Payload) (Payload, error) {
+ if len(p.Body) == 0 && len(p.Context) == 0 {
return EmptyPayload, fmt.Errorf("payload can not be empty")
}
@@ -109,9 +45,9 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
tw.w.State().Set(StateWorking)
- rsp, err := tw.execPayload(rqs)
+ rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(TaskError); !ok {
+ if _, ok := err.(JobError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}
@@ -124,7 +60,7 @@ func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
+func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
return EmptyPayload, errors.Wrap(err, "header error")
@@ -147,7 +83,7 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
}
if pr.HasFlag(goridge.PayloadError) {
- return EmptyPayload, TaskError(rsp.Context)
+ return EmptyPayload, JobError(rsp.Context)
}
// add streaming support :)
@@ -158,46 +94,46 @@ func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
return rsp, nil
}
-func (tw *taskWorker) String() string {
+func (tw *syncWorker) String() string {
return tw.w.String()
}
-func (tw *taskWorker) Created() time.Time {
- return tw.w.Created()
+func (tw *syncWorker) Pid() int64 {
+ return tw.w.Pid()
}
-func (tw *taskWorker) Events() <-chan WorkerEvent {
- return tw.w.Events()
+func (tw *syncWorker) Created() time.Time {
+ return tw.w.Created()
}
-func (tw *taskWorker) Pid() int64 {
- return tw.w.Pid()
+func (tw *syncWorker) AddListener(listener util.EventListener) {
+ tw.w.AddListener(listener)
}
-func (tw *taskWorker) State() State {
+func (tw *syncWorker) State() State {
return tw.w.State()
}
-func (tw *taskWorker) Start() error {
+func (tw *syncWorker) Start() error {
return tw.w.Start()
}
-func (tw *taskWorker) Wait(ctx context.Context) error {
+func (tw *syncWorker) Wait(ctx context.Context) error {
return tw.w.Wait(ctx)
}
-func (tw *taskWorker) Stop(ctx context.Context) error {
+func (tw *syncWorker) Stop(ctx context.Context) error {
return tw.w.Stop(ctx)
}
-func (tw *taskWorker) Kill(ctx context.Context) error {
+func (tw *syncWorker) Kill(ctx context.Context) error {
return tw.w.Kill(ctx)
}
-func (tw *taskWorker) Relay() goridge.Relay {
+func (tw *syncWorker) Relay() goridge.Relay {
return tw.w.Relay()
}
-func (tw *taskWorker) AttachRelay(rl goridge.Relay) {
+func (tw *syncWorker) AttachRelay(rl goridge.Relay) {
tw.w.AttachRelay(rl)
}
diff --git a/sync_worker_test.go b/sync_worker_test.go
index f4868009..7f969283 100755
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -2,11 +2,9 @@ package roadrunner
import (
"context"
- "errors"
"os/exec"
"sync"
"testing"
- "time"
"github.com/stretchr/testify/assert"
)
@@ -34,7 +32,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -65,7 +63,7 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := syncWorker.ExecWithContext(ctx, EmptyPayload)
+ res, err := syncWorker.Exec(EmptyPayload)
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -84,7 +82,6 @@ func Test_NotStarted_String(t *testing.T) {
}
func Test_NotStarted_Exec(t *testing.T) {
- ctx := context.Background()
cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
w, _ := InitBaseWorker(cmd)
@@ -94,7 +91,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -143,7 +140,7 @@ func Test_Echo_Slow(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -164,28 +161,17 @@ func Test_Broken(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
- go func() {
- assert.NotNil(t, w)
- tt := time.NewTimer(time.Second * 10)
- defer wg.Done()
- for {
- select {
- case ev := <-w.Events():
- assert.Contains(t, string(ev.Payload.([]byte)), "undefined_function()")
- return
- case <-tt.C:
- assert.Error(t, errors.New("no events from worker"))
- return
- }
- }
- }()
+ w.AddListener(func(event interface{}) {
+ assert.Contains(t, string(event.(WorkerEvent).Payload.([]byte)), "undefined_function()")
+ wg.Done()
+ })
syncWorker, err := NewSyncWorker(w)
if err != nil {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -215,12 +201,12 @@ func Test_Error(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.Exec(Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
- assert.IsType(t, TaskError{}, err)
+ assert.IsType(t, JobError{}, err)
assert.Equal(t, "hello", err.Error())
}
@@ -244,19 +230,19 @@ func Test_NumExecs(t *testing.T) {
t.Fatal(err)
}
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.Exec(Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/util/events.go b/util/events.go
new file mode 100755
index 00000000..9e12c4f7
--- /dev/null
+++ b/util/events.go
@@ -0,0 +1,26 @@
+package util
+
+// Event listener listens for the events produced by worker, worker pool or other servce.
+type EventListener func(event interface{})
+
+// EventHandler helps to broadcast events to multiple listeners.
+type EventHandler struct {
+ listeners []EventListener
+}
+
+// NumListeners returns number of event listeners.
+func (eb *EventHandler) NumListeners() int {
+ return len(eb.listeners)
+}
+
+// AddListener registers new event listener.
+func (eb *EventHandler) AddListener(listener EventListener) {
+ eb.listeners = append(eb.listeners, listener)
+}
+
+// Push broadcast events across all event listeners.
+func (eb *EventHandler) Push(e interface{}) {
+ for _, listener := range eb.listeners {
+ listener(e)
+ }
+}
diff --git a/worker.go b/worker.go
index c0a735c2..2dda51cc 100755
--- a/worker.go
+++ b/worker.go
@@ -11,10 +11,18 @@ import (
"sync"
"time"
+ "github.com/spiral/roadrunner/v2/util"
+
"github.com/spiral/goridge/v2"
"go.uber.org/multierr"
)
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+)
+
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
@@ -22,38 +30,31 @@ const (
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
-
- // EventWorkerWaitDone triggered when worker exit from process Wait
- EventWorkerWaitDone // todo: implemented?
-
- EventWorkerBufferClosed
-
- EventRelayCloseError
-
- EventWorkerProcessError
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 100 * time.Millisecond
-)
-
-// todo: write comment
+// WorkerEvent wraps worker events.
type WorkerEvent struct {
- Event int64
- Worker WorkerBase
+ // Event id, see below.
+ Event int64
+
+ // Worker triggered the event.
+ Worker WorkerBase
+
+ // Event specific payload.
Payload interface{}
}
type WorkerBase interface {
fmt.Stringer
- Created() time.Time
+ // Pid returns worker pid.
+ Pid() int64
- Events() <-chan WorkerEvent
+ // Created returns time worker was created at.
+ Created() time.Time
- Pid() int64
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener util.EventListener)
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
@@ -88,7 +89,7 @@ type WorkerProcess struct {
created time.Time
// updates parent supervisor or pool about WorkerProcess events
- events chan WorkerEvent
+ events *util.EventHandler
// state holds information about current WorkerProcess state,
// number of WorkerProcess executions, buf status change time.
@@ -129,7 +130,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
w := &WorkerProcess{
created: time.Now(),
- events: make(chan WorkerEvent, 10),
+ events: &util.EventHandler{},
cmd: cmd,
state: newState(StateInactive),
}
@@ -142,12 +143,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
return w, nil
}
+// Pid returns worker pid.
+func (w *WorkerProcess) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
func (w *WorkerProcess) Created() time.Time {
return w.created
}
-func (w *WorkerProcess) Pid() int64 {
- return int64(w.pid)
+// AddListener registers new worker event listener.
+func (w *WorkerProcess) AddListener(listener util.EventListener) {
+ w.events.AddListener(listener)
+
+ w.errBuffer.mu.Lock()
+ w.errBuffer.enable = true
+ w.errBuffer.mu.Unlock()
}
// State return receive-only WorkerProcess state object, state can be used to safely access
@@ -195,10 +207,6 @@ func (w *WorkerProcess) Start() error {
return nil
}
-func (w *WorkerProcess) Events() <-chan WorkerEvent {
- return w.events
-}
-
// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
@@ -208,15 +216,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error {
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(StateErrored)
- // if there are messages in the events channel, read it
- // TODO potentially danger place
- if len(w.events) > 0 {
- select {
- case ev := <-w.events:
- err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
- }
- }
- // if no errors in the events, error might be in the errbuffer
+
+ // if no errors in the events, error might be in the errBuffer
if w.errBuffer.Len() > 0 {
err = multierr.Append(err, errors.New(w.errBuffer.String()))
}
@@ -250,6 +251,7 @@ func (w *WorkerProcess) closeRelay() error {
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
func (w *WorkerProcess) Stop(ctx context.Context) error {
c := make(chan error)
+
go func() {
var err error
w.errBuffer.Close()
@@ -264,6 +266,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
w.state.Set(StateStopped)
c <- nil
}()
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -290,16 +293,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error {
}
func (w *WorkerProcess) logCallback(log []byte) {
- w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
}
// thread safe errBuffer
type errBuffer struct {
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo remove update
+ enable bool
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
+ // todo: remove update
update chan interface{}
stop chan interface{}
logCallback func(log []byte)
@@ -321,7 +325,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Reset(WaitDuration)
case <-eb.wait.C:
eb.mu.Lock()
- if len(eb.buf) > eb.last {
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.buf = eb.buf[0:0]
eb.last = len(eb.buf)
@@ -331,11 +335,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Stop()
eb.mu.Lock()
- if len(eb.buf) > eb.last {
- if eb == nil || eb.logCallback == nil {
- eb.mu.Unlock()
- return
- }
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.last = len(eb.buf)
}
diff --git a/worker_test.go b/worker_test.go
index a90b7ef2..d2744345 100755
--- a/worker_test.go
+++ b/worker_test.go
@@ -91,6 +91,7 @@ func TestErrBuffer_Write_Event(t *testing.T) {
assert.Equal(t, []byte("hello\n"), log)
wg.Done()
}
+ buf.enable = true
_, err := buf.Write([]byte("hello\n"))
if err != nil {
@@ -116,6 +117,8 @@ func TestErrBuffer_Write_Event_Separated(t *testing.T) {
assert.Equal(t, []byte("hello\nending"), log)
wg.Done()
}
+ buf.enable = true
+
_, err := buf.Write([]byte("hel"))
if err != nil {
t.Errorf("fail to write: error %v", err)
diff --git a/workers_watcher.go b/worker_watcher.go
index d9d27196..25c88a1a 100755
--- a/workers_watcher.go
+++ b/worker_watcher.go
@@ -5,6 +5,8 @@ import (
"errors"
"sync"
"time"
+
+ "github.com/spiral/roadrunner/v2/util"
)
var ErrWatcherStopped = errors.New("watcher stopped")
@@ -59,36 +61,36 @@ func (stack *Stack) Pop() (WorkerBase, bool) {
return w, false
}
-type WorkersWatcher struct {
- mutex sync.RWMutex
- stack *Stack
- allocator func(args ...interface{}) (WorkerBase, error)
- initialNumWorkers int64
- actualNumWorkers int64
- events chan PoolEvent
-}
-
type WorkerWatcher interface {
// AddToWatch used to add stack to wait its state
AddToWatch(ctx context.Context, workers []WorkerBase) error
+
// GetFreeWorker provide first free worker
GetFreeWorker(ctx context.Context) (WorkerBase, error)
+
// PutWorker enqueues worker back
PushWorker(w WorkerBase)
+
// AllocateNew used to allocate new worker and put in into the WorkerWatcher
AllocateNew(ctx context.Context) error
+
// Destroy destroys the underlying stack
Destroy(ctx context.Context)
+
// WorkersList return all stack w/o removing it from internal storage
WorkersList() []WorkerBase
+
// RemoveWorker remove worker from the stack
RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// workerCreateFunc can be nil, but in that case, dead stack will not be replaced
-func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher {
- // todo check if events not nil
- ww := &WorkersWatcher{
+func newWorkerWatcher(
+ allocator func(args ...interface{}) (WorkerBase, error),
+ numWorkers int64,
+ events *util.EventHandler,
+) *workerWatcher {
+ ww := &workerWatcher{
stack: NewWorkersStack(),
allocator: allocator,
initialNumWorkers: numWorkers,
@@ -99,14 +101,23 @@ func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), n
return ww
}
-func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
+type workerWatcher struct {
+ mutex sync.RWMutex
+ stack *Stack
+ allocator func(args ...interface{}) (WorkerBase, error)
+ initialNumWorkers int64
+ actualNumWorkers int64
+ events *util.EventHandler
+}
+
+func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error {
for i := 0; i < len(workers); i++ {
sw, err := NewSyncWorker(workers[i])
if err != nil {
return err
}
ww.stack.Push(sw)
- ww.watch(sw)
+ sw.AddListener(ww.events.Push)
go func(swc WorkerBase) {
ww.wait(ctx, swc)
@@ -115,12 +126,13 @@ func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase)
return nil
}
-func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
+func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) {
// thread safe operation
w, stop := ww.stack.Pop()
if stop {
return nil, ErrWatcherStopped
}
+
// handle worker remove state
// in this state worker is destroyed by supervisor
if w != nil && w.State().Value() == StateRemove {
@@ -131,6 +143,7 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
// try to get next
return ww.GetFreeWorker(ctx)
}
+
// no free stack
if w == nil {
tout := time.NewTicker(time.Second * 180)
@@ -152,23 +165,31 @@ func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)
}
}
}
+
ww.decreaseNumOfActualWorkers()
return w, nil
}
-func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error {
+func (ww *workerWatcher) AllocateNew(ctx context.Context) error {
ww.stack.mutex.Lock()
sw, err := ww.allocator()
if err != nil {
return err
}
+
ww.addToWatch(sw)
ww.stack.mutex.Unlock()
ww.PushWorker(sw)
+
+ ww.events.Push(PoolEvent{
+ Event: EventWorkerConstruct,
+ Payload: sw,
+ })
+
return nil
}
-func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
+func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error {
ww.stack.mutex.Lock()
defer ww.stack.mutex.Unlock()
pid := wb.Pid()
@@ -193,19 +214,19 @@ func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error
}
// O(1) operation
-func (ww *WorkersWatcher) PushWorker(w WorkerBase) {
+func (ww *workerWatcher) PushWorker(w WorkerBase) {
ww.mutex.Lock()
ww.actualNumWorkers++
ww.mutex.Unlock()
ww.stack.Push(w)
}
-func (ww *WorkersWatcher) ReduceWorkersCount() {
+func (ww *workerWatcher) ReduceWorkersCount() {
ww.decreaseNumOfActualWorkers()
}
// Destroy all underlying stack (but let them to complete the task)
-func (ww *WorkersWatcher) Destroy(ctx context.Context) {
+func (ww *workerWatcher) Destroy(ctx context.Context) {
ww.stack.mutex.Lock()
ww.stack.destroy = true
ww.stack.mutex.Unlock()
@@ -238,67 +259,62 @@ func (ww *WorkersWatcher) Destroy(ctx context.Context) {
}
// Warning, this is O(n) operation
-func (ww *WorkersWatcher) WorkersList() []WorkerBase {
+func (ww *workerWatcher) WorkersList() []WorkerBase {
return ww.stack.workers
}
-func (ww *WorkersWatcher) wait(ctx context.Context, w WorkerBase) {
+func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) {
err := w.Wait(ctx)
if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
+ ww.events.Push(WorkerEvent{
Event: EventWorkerError,
Worker: w,
Payload: err,
- }}
+ })
}
- // If not destroyed, reallocate
- if w.State().Value() != StateDestroyed {
- pid := w.Pid()
- ww.stack.mutex.Lock()
- for i := 0; i < len(ww.stack.workers); i++ {
- // worker in the stack, reallocating
- if ww.stack.workers[i].Pid() == pid {
- ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
- ww.decreaseNumOfActualWorkers()
- ww.stack.mutex.Unlock()
- err = ww.AllocateNew(ctx)
- if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerError,
- Worker: w,
- Payload: err,
- }}
- return
- }
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerConstruct,
- Worker: nil,
- Payload: nil,
- }}
- return
+
+ if w.State().Value() == StateDestroyed {
+ // worker was manually destroyed, no need to replace
+ return
+ }
+
+ pid := w.Pid()
+ ww.stack.mutex.Lock()
+ for i := 0; i < len(ww.stack.workers); i++ {
+ // worker in the stack, reallocating
+ if ww.stack.workers[i].Pid() == pid {
+ ww.stack.workers = append(ww.stack.workers[:i], ww.stack.workers[i+1:]...)
+ ww.decreaseNumOfActualWorkers()
+ ww.stack.mutex.Unlock()
+
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events.Push(PoolEvent{
+ Event: EventPoolError,
+ Payload: err,
+ })
}
- }
- ww.stack.mutex.Unlock()
- // worker not in the stack (not returned), forget and allocate new
- err = ww.AllocateNew(ctx)
- if err != nil {
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerError,
- Worker: w,
- Payload: err,
- }}
+
return
}
- ww.events <- PoolEvent{Payload: WorkerEvent{
- Event: EventWorkerConstruct,
- Worker: nil,
- Payload: nil,
- }}
}
+
+ ww.stack.mutex.Unlock()
+
+ // worker not in the stack (not returned), forget and allocate new
+ err = ww.AllocateNew(ctx)
+ if err != nil {
+ ww.events.Push(PoolEvent{
+ Event: EventPoolError,
+ Payload: err,
+ })
+ return
+ }
+
return
}
-func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
+func (ww *workerWatcher) addToWatch(wb WorkerBase) {
ww.mutex.Lock()
defer ww.mutex.Unlock()
go func() {
@@ -306,18 +322,8 @@ func (ww *WorkersWatcher) addToWatch(wb WorkerBase) {
}()
}
-func (ww *WorkersWatcher) decreaseNumOfActualWorkers() {
+func (ww *workerWatcher) decreaseNumOfActualWorkers() {
ww.mutex.Lock()
ww.actualNumWorkers--
ww.mutex.Unlock()
}
-
-func (ww *WorkersWatcher) watch(swc WorkerBase) {
- // todo make event to stop function
- go func() {
- select {
- case ev := <-swc.Events():
- ww.events <- PoolEvent{Payload: ev}
- }
- }()
-}