summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
committerValery Piashchynski <[email protected]>2020-10-19 14:01:59 +0300
commit77670fb7af0c892c9b3a589fd424534fad288e7a (patch)
tree3adcaa85db664a355abe2b28f1d7e4a3fc45689f
parent16fbf3104c3c34bd9355593052b686acd26a8efe (diff)
Update according activity worker
-rw-r--r--go.mod1
-rw-r--r--go.sum3
-rw-r--r--pipe_factory_test.go47
-rw-r--r--plugins/config/provider.go4
-rw-r--r--plugins/config/tests/plugin1.go9
-rw-r--r--plugins/config/viper.go9
-rw-r--r--plugins/factory/app.go39
-rw-r--r--plugins/factory/factory.go10
-rw-r--r--plugins/factory/tests/factory_test.go1
-rw-r--r--plugins/factory/tests/plugin_1.go4
-rw-r--r--pool.go49
-rw-r--r--pool_supervisor.go2
-rw-r--r--pool_test.go53
-rw-r--r--socket_factory_test.go12
-rw-r--r--static_pool.go77
-rw-r--r--static_pool_test.go40
-rw-r--r--sync_worker.go39
-rw-r--r--sync_worker_test.go18
-rw-r--r--util/doc.go2
-rw-r--r--util/isolate_win.go2
-rw-r--r--util/network_windows_test.go2
-rw-r--r--worker.go6
22 files changed, 244 insertions, 185 deletions
diff --git a/go.mod b/go.mod
index e65a3f3a..ddf0fe98 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,7 @@ go 1.15
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
+ github.com/fatih/color v1.7.0
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/json-iterator/go v1.1.10
github.com/pkg/errors v0.9.1
diff --git a/go.sum b/go.sum
index 851bacb2..72ca37a7 100644
--- a/go.sum
+++ b/go.sum
@@ -40,6 +40,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
+github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@@ -119,7 +120,9 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
+github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
+github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
index 93d9ccd8..4eda21a6 100644
--- a/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}()
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(context.Background(), Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -192,18 +192,6 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
b.Fatal(err)
}
- //go func() {
- // for {
- // select {
- // case event := <-w.Events():
- // b.Fatal(event)
- // }
- // }
- // //err := w.Wait()
- // //if err != nil {
- // // b.Errorf("error waiting the WorkerProcess: error %v", err)
- // //}
- //}()
defer func() {
err = w.Stop(ctx)
if err != nil {
@@ -217,8 +205,35 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
}
+
+func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ ctx := context.Background()
+ w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ defer func() {
+ err = w.Stop(ctx)
+ if err != nil {
+ b.Errorf("error stopping the WorkerProcess: error %v", err)
+ }
+ }()
+
+ sw, err := NewSyncWorker(w)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ for n := 0; n < b.N; n++ {
+ if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil {
+ b.Fail()
+ }
+ }
+} \ No newline at end of file
diff --git a/plugins/config/provider.go b/plugins/config/provider.go
index bec417e9..580231fd 100644
--- a/plugins/config/provider.go
+++ b/plugins/config/provider.go
@@ -10,6 +10,10 @@ type Provider interface {
// }
// }
UnmarshalKey(name string, out interface{}) error
+
// Get used to get config section
Get(name string) interface{}
+
+ // Has checks if config section exists.
+ Has(name string) bool
}
diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go
index 7573dc82..7c5f2afd 100644
--- a/plugins/config/tests/plugin1.go
+++ b/plugins/config/tests/plugin1.go
@@ -15,18 +15,17 @@ type ReloadConfig struct {
}
type ServiceConfig struct {
- Enabled bool
+ Enabled bool
Recursive bool
- Patterns []string
- Dirs []string
- Ignore []string
+ Patterns []string
+ Dirs []string
+ Ignore []string
}
type Foo struct {
configProvider config.Provider
}
-
// Depends on S2 and DB (S3 in the current case)
func (f *Foo) Init(p config.Provider) error {
f.configProvider = p
diff --git a/plugins/config/viper.go b/plugins/config/viper.go
index 0362e79b..b276dbe2 100644
--- a/plugins/config/viper.go
+++ b/plugins/config/viper.go
@@ -17,17 +17,21 @@ type ViperProvider struct {
//////// ENDURE //////////
func (v *ViperProvider) Init() error {
v.viper = viper.New()
+
// read in environment variables that match
v.viper.AutomaticEnv()
if v.Prefix == "" {
return errors.New("prefix should be set")
}
+
v.viper.SetEnvPrefix(v.Prefix)
if v.Path == "" {
return errors.New("path should be set")
}
+
v.viper.SetConfigFile(v.Path)
v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+
return v.viper.ReadInConfig()
}
@@ -62,6 +66,11 @@ func (v *ViperProvider) Get(name string) interface{} {
return v.viper.Get(name)
}
+// Has checks if config section exists.
+func (v *ViperProvider) Has(name string) bool {
+ return v.viper.IsSet(name)
+}
+
/////////// PRIVATE //////////////
func parseFlag(flag string) (string, string, error) {
diff --git a/plugins/factory/app.go b/plugins/factory/app.go
index f9e7944c..753ca2a9 100644
--- a/plugins/factory/app.go
+++ b/plugins/factory/app.go
@@ -1,7 +1,6 @@
package factory
import (
- "context"
"errors"
"fmt"
"os"
@@ -21,11 +20,10 @@ type AppConfig struct {
Group string
Env Env
- Relay string
// Listen defines connection method and factory to be used to connect to workers:
// "pipes", "tcp://:6001", "unix://rr.sock"
// This config section must not change on re-configuration.
- Listen string
+ Relay string
// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
// must not change on re-configuration.
@@ -33,23 +31,28 @@ type AppConfig struct {
}
type App struct {
- cfg *AppConfig
+ cfg AppConfig
configProvider config.Provider
factory roadrunner.Factory
}
func (app *App) Init(provider config.Provider) error {
- app.cfg = &AppConfig{}
+ app.cfg = AppConfig{}
app.configProvider = provider
return nil
}
func (app *App) Configure() error {
- err := app.configProvider.UnmarshalKey("app", app.cfg)
+ err := app.configProvider.UnmarshalKey("app", &app.cfg)
if err != nil {
return err
}
+
+ if app.cfg.Relay == "" {
+ app.cfg.Relay = "pipes"
+ }
+
return nil
}
@@ -83,30 +86,28 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) {
// todo ENV unused
func (app *App) NewFactory(env Env) (roadrunner.Factory, error) {
- // if Listen is empty or doesn't contain separator, return error
- if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") {
- return nil, errors.New("relay should be set")
- }
-
- lsn, err := util.CreateListener(app.cfg.Listen)
- if err != nil {
- return nil, err
+ if app.cfg.Relay == "" || app.cfg.Relay == "pipes" {
+ return roadrunner.NewPipeFactory(), nil
}
- dsn := strings.Split(app.cfg.Listen, "://")
+ dsn := strings.Split(app.cfg.Relay, "://")
if len(dsn) != 2 {
return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
+ lsn, err := util.CreateListener(app.cfg.Relay)
+ if err != nil {
+ return nil, err
+ }
+
switch dsn[0] {
// sockets group
case "unix":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
case "tcp":
return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil
- // pipes
default:
- return roadrunner.NewPipeFactory(), nil
+ return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)")
}
}
@@ -116,10 +117,6 @@ func (app *App) Serve() chan error {
}
func (app *App) Stop() error {
- err := app.factory.Close(context.Background())
- if err != nil {
- return err
- }
return nil
}
diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go
index c5490cd6..5d80682d 100644
--- a/plugins/factory/factory.go
+++ b/plugins/factory/factory.go
@@ -3,9 +3,11 @@ package factory
import (
"context"
- "github.com/spiral/roadrunner/v2/plugins/events"
+ "log"
+ "github.com/fatih/color"
"github.com/spiral/roadrunner/v2"
+ "github.com/spiral/roadrunner/v2/plugins/events"
)
type WorkerFactory interface {
@@ -23,6 +25,7 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e
if err != nil {
return nil, err
}
+
factory, err := wf.spw.NewFactory(env)
if err != nil {
return nil, err
@@ -37,6 +40,11 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e
go func() {
for e := range p.Events() {
wf.eb.Push(e)
+ if we, ok := e.Payload.(roadrunner.WorkerEvent); ok {
+ if we.Event == roadrunner.EventWorkerLog {
+ log.Print(color.YellowString(string(we.Payload.([]byte))))
+ }
+ }
}
}()
diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go
index 38e939e1..72e28f84 100644
--- a/plugins/factory/tests/factory_test.go
+++ b/plugins/factory/tests/factory_test.go
@@ -46,7 +46,6 @@ func TestFactory(t *testing.T) {
t.Fatal(err)
}
-
err = container.Init()
if err != nil {
t.Fatal(err)
diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go
index 0c44a0d1..5ab6df73 100644
--- a/plugins/factory/tests/plugin_1.go
+++ b/plugins/factory/tests/plugin_1.go
@@ -9,8 +9,8 @@ import (
)
type Foo struct {
- configProvider config.Provider
- spawner factory.Spawner
+ configProvider config.Provider
+ spawner factory.Spawner
}
func (f *Foo) Init(p config.Provider, spw factory.Spawner) error {
diff --git a/pool.go b/pool.go
index 67d092c0..9038c730 100644
--- a/pool.go
+++ b/pool.go
@@ -2,7 +2,6 @@ package roadrunner
import (
"context"
- "fmt"
"runtime"
"time"
)
@@ -47,7 +46,9 @@ type Pool interface {
Events() chan PoolEvent
// Exec one task with given payload and context, returns result or error.
- Exec(ctx context.Context, rqs Payload) (Payload, error)
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
+
+ Exec(rqs Payload) (Payload, error)
// Workers returns worker list associated with the pool.
Workers() (workers []WorkerBase)
@@ -66,7 +67,7 @@ type Pool interface {
//
type Config struct {
// NumWorkers defines how many sub-processes can be run at once. This value
- // might be doubled by Swapper while hot-swap.
+ // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
NumWorkers int64
// MaxJobs defines how many executions is allowed for the worker until
@@ -75,17 +76,17 @@ type Config struct {
MaxJobs int64
// AllocateTimeout defines for how long pool will be waiting for a worker to
- // be freed to handle the task.
+ // be freed to handle the task. Defaults to 60s.
AllocateTimeout time.Duration
// DestroyTimeout defines for how long pool should be waiting for worker to
- // properly destroy, if timeout reached worker will be killed.
+ // properly destroy, if timeout reached worker will be killed. Defaults to 60s.
DestroyTimeout time.Duration
// TTL defines maximum time worker is allowed to live.
TTL int64
- // IdleTTL defines maximum duration worker can spend in idle mode.
+ // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
IdleTTL int64
// ExecTTL defines maximum lifetime per job.
@@ -94,45 +95,21 @@ type Config struct {
// MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
MaxPoolMemory uint64
+ // MaxWorkerMemory limits memory per worker.
MaxWorkerMemory uint64
-
- // config from limit plugin, combine TODO
- // single bootstrap TODO, bool
- // warmup one worker and start consume requests and then start the rest of the stack
-
- // max memory for pool
- // max ttl
- // max idle ttl
-
- // ATTACHER interface - delete
-}
-
-// InitDefaults allows to init blank config with pre-defined set of default values.
-func (cfg *Config) InitDefaults() error {
- cfg.AllocateTimeout = time.Minute
- cfg.DestroyTimeout = time.Minute
- cfg.NumWorkers = int64(runtime.NumCPU())
-
- return nil
}
-// Valid returns error if config not valid.
-func (cfg *Config) Valid() error {
+// InitDefaults enables default config values.
+func (cfg *Config) InitDefaults() {
if cfg.NumWorkers == 0 {
- return fmt.Errorf("pool.NumWorkers must be set")
+ cfg.NumWorkers = int64(runtime.NumCPU())
}
if cfg.AllocateTimeout == 0 {
- return fmt.Errorf("pool.AllocateTimeout must be set")
+ cfg.AllocateTimeout = time.Minute
}
if cfg.DestroyTimeout == 0 {
- return fmt.Errorf("pool.DestroyTimeout must be set")
+ cfg.DestroyTimeout = time.Minute
}
-
- if cfg.ExecTTL == 0 {
- return fmt.Errorf("pool.ExecTTL must be set")
- }
-
- return nil
}
diff --git a/pool_supervisor.go b/pool_supervisor.go
index 93afb8c6..73c1c5b7 100644
--- a/pool_supervisor.go
+++ b/pool_supervisor.go
@@ -47,9 +47,11 @@ func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, wa
// just set to a big number, 5GB
maxPoolMemory = 5000 * MB
}
+
if watchTimeout == 0 {
watchTimeout = 60
}
+
return &staticPoolSupervisor{
maxWorkerMemory: maxWorkerMemory,
maxPoolMemory: maxPoolMemory,
diff --git a/pool_test.go b/pool_test.go
deleted file mode 100644
index 998dd9d4..00000000
--- a/pool_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package roadrunner
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func Test_NumWorkers(t *testing.T) {
- cfg := Config{
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second * 10,
- }
- err := cfg.Valid()
-
- assert.NotNil(t, err)
- assert.Equal(t, "pool.NumWorkers must be set", err.Error())
-}
-
-func Test_NumWorkers_Default(t *testing.T) {
- cfg := Config{
- AllocateTimeout: time.Second,
- DestroyTimeout: time.Second * 10,
- ExecTTL: time.Second * 5,
- }
-
- assert.NoError(t, cfg.InitDefaults())
- err := cfg.Valid()
- assert.Nil(t, err)
-}
-
-func Test_AllocateTimeout(t *testing.T) {
- cfg := Config{
- NumWorkers: 10,
- DestroyTimeout: time.Second * 10,
- }
- err := cfg.Valid()
-
- assert.NotNil(t, err)
- assert.Equal(t, "pool.AllocateTimeout must be set", err.Error())
-}
-
-func Test_DestroyTimeout(t *testing.T) {
- cfg := Config{
- NumWorkers: 10,
- AllocateTimeout: time.Second,
- }
- err := cfg.Valid()
-
- assert.NotNil(t, err)
- assert.Equal(t, "pool.DestroyTimeout must be set", err.Error())
-}
diff --git a/socket_factory_test.go b/socket_factory_test.go
index 45443337..0c953b33 100644
--- a/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -206,7 +206,7 @@ func Test_Tcp_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -390,7 +390,7 @@ func Test_Unix_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -433,7 +433,7 @@ func Test_Unix_Echo(t *testing.T) {
t.Fatal(err)
}
- res, err := sw.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -511,7 +511,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -579,7 +579,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
}
for n := 0; n < b.N; n++ {
- if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
diff --git a/static_pool.go b/static_pool.go
index 2e72864d..bc990da5 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -14,6 +14,8 @@ const (
StopRequest = "{\"stop\":true}"
)
+var bCtx = context.Background()
+
// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type StaticPool struct {
// pool behaviour
@@ -40,9 +42,7 @@ type PoolEvent struct {
// supervisor Supervisor, todo: think about it
// stack func() (WorkerBase, error),
func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config")
- }
+ cfg.InitDefaults()
p := &StaticPool{
cfg: cfg,
@@ -92,8 +92,63 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error {
return p.ww.RemoveWorker(ctx, wb)
}
+func (p *StaticPool) Exec(rqs Payload) (Payload, error) {
+ w, err := p.ww.GetFreeWorker(context.Background())
+ if err != nil && errors.Is(err, ErrWatcherStopped) {
+ return EmptyPayload, ErrWatcherStopped
+ } else if err != nil {
+ return EmptyPayload, err
+ }
+
+ sw := w.(SyncWorker)
+
+ rsp, err := sw.Exec(rqs)
+ if err != nil {
+ errJ := p.checkMaxJobs(bCtx, w)
+ if errJ != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errJ)
+ }
+ // soft job errors are allowed
+ if _, jobError := err.(TaskError); jobError {
+ p.ww.PushWorker(w)
+ return EmptyPayload, err
+ }
+
+ sw.State().Set(StateInvalid)
+ errS := w.Stop(bCtx)
+ if errS != nil {
+ return EmptyPayload, fmt.Errorf("%v, %v", err, errS)
+ }
+
+ return EmptyPayload, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ w.State().Set(StateInvalid)
+ err = w.Stop(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ return p.ExecWithContext(bCtx, rqs)
+ }
+
+ if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
+ err = p.ww.AllocateNew(bCtx)
+ if err != nil {
+ return EmptyPayload, err
+ }
+ } else {
+ p.muw.Lock()
+ p.ww.PushWorker(w)
+ p.muw.Unlock()
+ }
+ return rsp, nil
+}
+
// Exec one task with given payload and context, returns result or error.
-func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
+func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+ // todo: why TODO passed here?
getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout)
defer cancel()
w, err := p.ww.GetFreeWorker(getWorkerCtx)
@@ -105,10 +160,16 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
sw := w.(SyncWorker)
- execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
- defer cancel2()
+ var execCtx context.Context
+ if p.cfg.ExecTTL != 0 {
+ var cancel2 context.CancelFunc
+ execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL)
+ defer cancel2()
+ } else {
+ execCtx = ctx
+ }
- rsp, err := sw.Exec(execCtx, rqs)
+ rsp, err := sw.ExecWithContext(execCtx, rqs)
if err != nil {
errJ := p.checkMaxJobs(ctx, w)
if errJ != nil {
@@ -136,7 +197,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) {
if err != nil {
return EmptyPayload, err
}
- return p.Exec(ctx, rqs)
+ return p.ExecWithContext(ctx, rqs)
}
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
diff --git a/static_pool_test.go b/static_pool_test.go
index b2ab4713..fd8124ac 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -50,7 +50,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
assert.Error(t, err)
}
-func Test_ConfigError(t *testing.T) {
+func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := NewPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
@@ -61,8 +61,8 @@ func Test_ConfigError(t *testing.T) {
},
)
- assert.Nil(t, p)
- assert.Error(t, err)
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
}
func Test_StaticPool_Echo(t *testing.T) {
@@ -79,7 +79,7 @@ func Test_StaticPool_Echo(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -103,7 +103,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(ctx, Payload{Body: []byte("hello"), Context: nil})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -127,7 +127,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(ctx, Payload{Body: []byte("hello"), Context: []byte("world")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -150,7 +150,7 @@ func Test_StaticPool_JobError(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -188,7 +188,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
}()
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
@@ -212,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
assert.NotNil(t, p)
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -290,11 +290,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, _ := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -337,14 +337,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
if err != nil {
t.Fatal(err)
}
assert.Equal(t, lastPID, string(res.Body))
for i := 0; i < 10; i++ {
- res, err := p.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NoError(t, err)
assert.NotNil(t, res)
@@ -375,7 +375,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
assert.NoError(t, err)
p.Destroy(ctx)
- _, err = p.Exec(ctx, Payload{Body: []byte("100")})
+ _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -398,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(ctx, Payload{Body: []byte("100")})
+ _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")})
if err != nil {
t.Errorf("error executing payload: error %v", err)
}
@@ -406,7 +406,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
time.Sleep(time.Millisecond * 10)
p.Destroy(ctx)
- _, err = p.Exec(ctx, Payload{Body: []byte("100")})
+ _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")})
assert.Error(t, err)
}
@@ -433,7 +433,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
w.State().Set(StateErrored)
}
- _, err = p.Exec(ctx, Payload{Body: []byte("hello")})
+ _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
}
@@ -472,7 +472,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
}
}
@@ -499,7 +499,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
- if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
@@ -529,7 +529,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
- if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil {
+ if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil {
b.Fail()
log.Println(err)
}
diff --git a/sync_worker.go b/sync_worker.go
index 45629f3e..a6e1ed01 100644
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -14,8 +14,11 @@ var EmptyPayload = Payload{}
type SyncWorker interface {
// WorkerBase provides basic functionality for the SyncWorker
WorkerBase
- // Exec used to execute payload on the SyncWorker
- Exec(ctx context.Context, rqs Payload) (Payload, error)
+ // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
+ Exec(rqs Payload) (Payload, error)
+
+ // ExecWithContext allow to set ExecTTL
+ ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
}
type taskWorker struct {
@@ -33,7 +36,7 @@ type twexec struct {
err error
}
-func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) {
+func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
c := make(chan twexec)
go func() {
if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
@@ -92,6 +95,36 @@ func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) {
}
}
+//
+func (tw *taskWorker) Exec(rqs Payload) (Payload, error) {
+ if len(rqs.Body) == 0 && len(rqs.Context) == 0 {
+ return EmptyPayload, fmt.Errorf("payload can not be empty")
+ }
+
+ if tw.w.State().Value() != StateReady {
+ return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())
+ }
+
+ // set last used time
+ tw.w.State().SetLastUsed(uint64(time.Now().UnixNano()))
+ tw.w.State().Set(StateWorking)
+
+ rsp, err := tw.execPayload(rqs)
+ if err != nil {
+ if _, ok := err.(TaskError); !ok {
+ tw.w.State().Set(StateErrored)
+ tw.w.State().RegisterExec()
+ }
+ return EmptyPayload, err
+ }
+
+ tw.w.State().Set(StateReady)
+ tw.w.State().RegisterExec()
+
+ return rsp, nil
+
+}
+
func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) {
// two things; todo: merge
if err := sendControl(tw.w.Relay(), rqs.Context); err != nil {
diff --git a/sync_worker_test.go b/sync_worker_test.go
index e1cec4b6..f4868009 100644
--- a/sync_worker_test.go
+++ b/sync_worker_test.go
@@ -34,7 +34,7 @@ func Test_Echo(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -65,7 +65,7 @@ func Test_BadPayload(t *testing.T) {
}
}()
- res, err := syncWorker.Exec(ctx, EmptyPayload)
+ res, err := syncWorker.ExecWithContext(ctx, EmptyPayload)
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -94,7 +94,7 @@ func Test_NotStarted_Exec(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Body)
@@ -143,7 +143,7 @@ func Test_Echo_Slow(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.Nil(t, err)
assert.NotNil(t, res)
@@ -185,7 +185,7 @@ func Test_Broken(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -215,7 +215,7 @@ func Test_Error(t *testing.T) {
t.Fatal(err)
}
- res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
assert.NotNil(t, err)
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
@@ -244,19 +244,19 @@ func Test_NumExecs(t *testing.T) {
t.Fatal(err)
}
- _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(1), w.State().NumExecs())
- _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
assert.Equal(t, int64(2), w.State().NumExecs())
- _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")})
+ _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")})
if err != nil {
t.Errorf("fail to execute payload: error %v", err)
}
diff --git a/util/doc.go b/util/doc.go
index c6006de4..a3798715 100644
--- a/util/doc.go
+++ b/util/doc.go
@@ -2,4 +2,4 @@ package util
/*
This package should not contain roadrunner dependencies, only system or third-party
- */
+*/
diff --git a/util/isolate_win.go b/util/isolate_win.go
index 77674b3b..6756d59f 100644
--- a/util/isolate_win.go
+++ b/util/isolate_win.go
@@ -14,4 +14,4 @@ func IsolateProcess(cmd *exec.Cmd) {
func ExecuteFromUser(cmd *exec.Cmd, u string) error {
return nil
-} \ No newline at end of file
+}
diff --git a/util/network_windows_test.go b/util/network_windows_test.go
index a5a8064e..b6648ed0 100644
--- a/util/network_windows_test.go
+++ b/util/network_windows_test.go
@@ -13,4 +13,4 @@ func TestCreateListener(t *testing.T) {
_, err = CreateListener("aaa://192.168.0.1")
assert.Error(t, err, "Invalid Protocol (tcp://:6001, unix://file.sock)")
-} \ No newline at end of file
+}
diff --git a/worker.go b/worker.go
index 82bd99df..c0a735c2 100644
--- a/worker.go
+++ b/worker.go
@@ -24,7 +24,7 @@ const (
EventWorkerLog
// EventWorkerWaitDone triggered when worker exit from process Wait
- EventWorkerWaitDone
+ EventWorkerWaitDone // todo: implemented?
EventWorkerBufferClosed
@@ -61,6 +61,7 @@ type WorkerBase interface {
// Start used to run Cmd and immediately return
Start() error
+
// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
@@ -69,11 +70,14 @@ type WorkerBase interface {
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
Stop(ctx context.Context) error
+
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
Kill(ctx context.Context) error
+
// Relay returns attached to worker goridge relay
Relay() goridge.Relay
+
// AttachRelay used to attach goridge relay to the worker process
AttachRelay(rl goridge.Relay)
}