diff options
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 3 | ||||
-rw-r--r-- | pipe_factory_test.go | 47 | ||||
-rw-r--r-- | plugins/config/provider.go | 4 | ||||
-rw-r--r-- | plugins/config/tests/plugin1.go | 9 | ||||
-rw-r--r-- | plugins/config/viper.go | 9 | ||||
-rw-r--r-- | plugins/factory/app.go | 39 | ||||
-rw-r--r-- | plugins/factory/factory.go | 10 | ||||
-rw-r--r-- | plugins/factory/tests/factory_test.go | 1 | ||||
-rw-r--r-- | plugins/factory/tests/plugin_1.go | 4 | ||||
-rw-r--r-- | pool.go | 49 | ||||
-rw-r--r-- | pool_supervisor.go | 2 | ||||
-rw-r--r-- | pool_test.go | 53 | ||||
-rw-r--r-- | socket_factory_test.go | 12 | ||||
-rw-r--r-- | static_pool.go | 77 | ||||
-rw-r--r-- | static_pool_test.go | 40 | ||||
-rw-r--r-- | sync_worker.go | 39 | ||||
-rw-r--r-- | sync_worker_test.go | 18 | ||||
-rw-r--r-- | util/doc.go | 2 | ||||
-rw-r--r-- | util/isolate_win.go | 2 | ||||
-rw-r--r-- | util/network_windows_test.go | 2 | ||||
-rw-r--r-- | worker.go | 6 |
22 files changed, 244 insertions, 185 deletions
@@ -4,6 +4,7 @@ go 1.15 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect + github.com/fatih/color v1.7.0 github.com/go-ole/go-ole v1.2.4 // indirect github.com/json-iterator/go v1.1.10 github.com/pkg/errors v0.9.1 @@ -40,6 +40,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -119,7 +120,9 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= diff --git a/pipe_factory_test.go b/pipe_factory_test.go index 93d9ccd8..4eda21a6 100644 --- a/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -101,7 +101,7 @@ func Test_Pipe_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -129,7 +129,7 @@ func Test_Pipe_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -178,7 +178,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { }() for n := 0; n < b.N; n++ { - if _, err := sw.Exec(context.Background(), Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.ExecWithContext(context.Background(), Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -192,18 +192,6 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { b.Fatal(err) } - //go func() { - // for { - // select { - // case event := <-w.Events(): - // b.Fatal(event) - // } - // } - // //err := w.Wait() - // //if err != nil { - // // b.Errorf("error waiting the WorkerProcess: error %v", err) - // //} - //}() defer func() { err = w.Stop(ctx) if err != nil { @@ -217,8 +205,35 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } } + +func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + ctx := context.Background() + w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + if err != nil { + b.Fatal(err) + } + + defer func() { + err = w.Stop(ctx) + if err != nil { + b.Errorf("error stopping the WorkerProcess: error %v", err) + } + }() + + sw, err := NewSyncWorker(w) + if err != nil { + b.Fatal(err) + } + + for n := 0; n < b.N; n++ { + if _, err := sw.Exec(Payload{Body: []byte("hello")}); err != nil { + b.Fail() + } + } +}
\ No newline at end of file diff --git a/plugins/config/provider.go b/plugins/config/provider.go index bec417e9..580231fd 100644 --- a/plugins/config/provider.go +++ b/plugins/config/provider.go @@ -10,6 +10,10 @@ type Provider interface { // } // } UnmarshalKey(name string, out interface{}) error + // Get used to get config section Get(name string) interface{} + + // Has checks if config section exists. + Has(name string) bool } diff --git a/plugins/config/tests/plugin1.go b/plugins/config/tests/plugin1.go index 7573dc82..7c5f2afd 100644 --- a/plugins/config/tests/plugin1.go +++ b/plugins/config/tests/plugin1.go @@ -15,18 +15,17 @@ type ReloadConfig struct { } type ServiceConfig struct { - Enabled bool + Enabled bool Recursive bool - Patterns []string - Dirs []string - Ignore []string + Patterns []string + Dirs []string + Ignore []string } type Foo struct { configProvider config.Provider } - // Depends on S2 and DB (S3 in the current case) func (f *Foo) Init(p config.Provider) error { f.configProvider = p diff --git a/plugins/config/viper.go b/plugins/config/viper.go index 0362e79b..b276dbe2 100644 --- a/plugins/config/viper.go +++ b/plugins/config/viper.go @@ -17,17 +17,21 @@ type ViperProvider struct { //////// ENDURE ////////// func (v *ViperProvider) Init() error { v.viper = viper.New() + // read in environment variables that match v.viper.AutomaticEnv() if v.Prefix == "" { return errors.New("prefix should be set") } + v.viper.SetEnvPrefix(v.Prefix) if v.Path == "" { return errors.New("path should be set") } + v.viper.SetConfigFile(v.Path) v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + return v.viper.ReadInConfig() } @@ -62,6 +66,11 @@ func (v *ViperProvider) Get(name string) interface{} { return v.viper.Get(name) } +// Has checks if config section exists. +func (v *ViperProvider) Has(name string) bool { + return v.viper.IsSet(name) +} + /////////// PRIVATE ////////////// func parseFlag(flag string) (string, string, error) { diff --git a/plugins/factory/app.go b/plugins/factory/app.go index f9e7944c..753ca2a9 100644 --- a/plugins/factory/app.go +++ b/plugins/factory/app.go @@ -1,7 +1,6 @@ package factory import ( - "context" "errors" "fmt" "os" @@ -21,11 +20,10 @@ type AppConfig struct { Group string Env Env - Relay string // Listen defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. - Listen string + Relay string // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section // must not change on re-configuration. @@ -33,23 +31,28 @@ type AppConfig struct { } type App struct { - cfg *AppConfig + cfg AppConfig configProvider config.Provider factory roadrunner.Factory } func (app *App) Init(provider config.Provider) error { - app.cfg = &AppConfig{} + app.cfg = AppConfig{} app.configProvider = provider return nil } func (app *App) Configure() error { - err := app.configProvider.UnmarshalKey("app", app.cfg) + err := app.configProvider.UnmarshalKey("app", &app.cfg) if err != nil { return err } + + if app.cfg.Relay == "" { + app.cfg.Relay = "pipes" + } + return nil } @@ -83,30 +86,28 @@ func (app *App) NewCmd(env Env) (func() *exec.Cmd, error) { // todo ENV unused func (app *App) NewFactory(env Env) (roadrunner.Factory, error) { - // if Listen is empty or doesn't contain separator, return error - if app.cfg.Listen == "" || !strings.Contains(app.cfg.Listen, "://") { - return nil, errors.New("relay should be set") - } - - lsn, err := util.CreateListener(app.cfg.Listen) - if err != nil { - return nil, err + if app.cfg.Relay == "" || app.cfg.Relay == "pipes" { + return roadrunner.NewPipeFactory(), nil } - dsn := strings.Split(app.cfg.Listen, "://") + dsn := strings.Split(app.cfg.Relay, "://") if len(dsn) != 2 { return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") } + lsn, err := util.CreateListener(app.cfg.Relay) + if err != nil { + return nil, err + } + switch dsn[0] { // sockets group case "unix": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil case "tcp": return roadrunner.NewSocketServer(lsn, app.cfg.RelayTimeout), nil - // pipes default: - return roadrunner.NewPipeFactory(), nil + return nil, errors.New("invalid DSN (tcp://:6001, unix://file.sock)") } } @@ -116,10 +117,6 @@ func (app *App) Serve() chan error { } func (app *App) Stop() error { - err := app.factory.Close(context.Background()) - if err != nil { - return err - } return nil } diff --git a/plugins/factory/factory.go b/plugins/factory/factory.go index c5490cd6..5d80682d 100644 --- a/plugins/factory/factory.go +++ b/plugins/factory/factory.go @@ -3,9 +3,11 @@ package factory import ( "context" - "github.com/spiral/roadrunner/v2/plugins/events" + "log" + "github.com/fatih/color" "github.com/spiral/roadrunner/v2" + "github.com/spiral/roadrunner/v2/plugins/events" ) type WorkerFactory interface { @@ -23,6 +25,7 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e if err != nil { return nil, err } + factory, err := wf.spw.NewFactory(env) if err != nil { return nil, err @@ -37,6 +40,11 @@ func (wf *WFactory) NewWorkerPool(ctx context.Context, opt *roadrunner.Config, e go func() { for e := range p.Events() { wf.eb.Push(e) + if we, ok := e.Payload.(roadrunner.WorkerEvent); ok { + if we.Event == roadrunner.EventWorkerLog { + log.Print(color.YellowString(string(we.Payload.([]byte)))) + } + } } }() diff --git a/plugins/factory/tests/factory_test.go b/plugins/factory/tests/factory_test.go index 38e939e1..72e28f84 100644 --- a/plugins/factory/tests/factory_test.go +++ b/plugins/factory/tests/factory_test.go @@ -46,7 +46,6 @@ func TestFactory(t *testing.T) { t.Fatal(err) } - err = container.Init() if err != nil { t.Fatal(err) diff --git a/plugins/factory/tests/plugin_1.go b/plugins/factory/tests/plugin_1.go index 0c44a0d1..5ab6df73 100644 --- a/plugins/factory/tests/plugin_1.go +++ b/plugins/factory/tests/plugin_1.go @@ -9,8 +9,8 @@ import ( ) type Foo struct { - configProvider config.Provider - spawner factory.Spawner + configProvider config.Provider + spawner factory.Spawner } func (f *Foo) Init(p config.Provider, spw factory.Spawner) error { @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "runtime" "time" ) @@ -47,7 +46,9 @@ type Pool interface { Events() chan PoolEvent // Exec one task with given payload and context, returns result or error. - Exec(ctx context.Context, rqs Payload) (Payload, error) + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) + + Exec(rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []WorkerBase) @@ -66,7 +67,7 @@ type Pool interface { // type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value - // might be doubled by Swapper while hot-swap. + // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. NumWorkers int64 // MaxJobs defines how many executions is allowed for the worker until @@ -75,17 +76,17 @@ type Config struct { MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to - // be freed to handle the task. + // be freed to handle the task. Defaults to 60s. AllocateTimeout time.Duration // DestroyTimeout defines for how long pool should be waiting for worker to - // properly destroy, if timeout reached worker will be killed. + // properly destroy, if timeout reached worker will be killed. Defaults to 60s. DestroyTimeout time.Duration // TTL defines maximum time worker is allowed to live. TTL int64 - // IdleTTL defines maximum duration worker can spend in idle mode. + // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. IdleTTL int64 // ExecTTL defines maximum lifetime per job. @@ -94,45 +95,21 @@ type Config struct { // MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes. MaxPoolMemory uint64 + // MaxWorkerMemory limits memory per worker. MaxWorkerMemory uint64 - - // config from limit plugin, combine TODO - // single bootstrap TODO, bool - // warmup one worker and start consume requests and then start the rest of the stack - - // max memory for pool - // max ttl - // max idle ttl - - // ATTACHER interface - delete -} - -// InitDefaults allows to init blank config with pre-defined set of default values. -func (cfg *Config) InitDefaults() error { - cfg.AllocateTimeout = time.Minute - cfg.DestroyTimeout = time.Minute - cfg.NumWorkers = int64(runtime.NumCPU()) - - return nil } -// Valid returns error if config not valid. -func (cfg *Config) Valid() error { +// InitDefaults enables default config values. +func (cfg *Config) InitDefaults() { if cfg.NumWorkers == 0 { - return fmt.Errorf("pool.NumWorkers must be set") + cfg.NumWorkers = int64(runtime.NumCPU()) } if cfg.AllocateTimeout == 0 { - return fmt.Errorf("pool.AllocateTimeout must be set") + cfg.AllocateTimeout = time.Minute } if cfg.DestroyTimeout == 0 { - return fmt.Errorf("pool.DestroyTimeout must be set") + cfg.DestroyTimeout = time.Minute } - - if cfg.ExecTTL == 0 { - return fmt.Errorf("pool.ExecTTL must be set") - } - - return nil } diff --git a/pool_supervisor.go b/pool_supervisor.go index 93afb8c6..73c1c5b7 100644 --- a/pool_supervisor.go +++ b/pool_supervisor.go @@ -47,9 +47,11 @@ func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, wa // just set to a big number, 5GB maxPoolMemory = 5000 * MB } + if watchTimeout == 0 { watchTimeout = 60 } + return &staticPoolSupervisor{ maxWorkerMemory: maxWorkerMemory, maxPoolMemory: maxPoolMemory, diff --git a/pool_test.go b/pool_test.go deleted file mode 100644 index 998dd9d4..00000000 --- a/pool_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package roadrunner - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" -) - -func Test_NumWorkers(t *testing.T) { - cfg := Config{ - AllocateTimeout: time.Second, - DestroyTimeout: time.Second * 10, - } - err := cfg.Valid() - - assert.NotNil(t, err) - assert.Equal(t, "pool.NumWorkers must be set", err.Error()) -} - -func Test_NumWorkers_Default(t *testing.T) { - cfg := Config{ - AllocateTimeout: time.Second, - DestroyTimeout: time.Second * 10, - ExecTTL: time.Second * 5, - } - - assert.NoError(t, cfg.InitDefaults()) - err := cfg.Valid() - assert.Nil(t, err) -} - -func Test_AllocateTimeout(t *testing.T) { - cfg := Config{ - NumWorkers: 10, - DestroyTimeout: time.Second * 10, - } - err := cfg.Valid() - - assert.NotNil(t, err) - assert.Equal(t, "pool.AllocateTimeout must be set", err.Error()) -} - -func Test_DestroyTimeout(t *testing.T) { - cfg := Config{ - NumWorkers: 10, - AllocateTimeout: time.Second, - } - err := cfg.Valid() - - assert.NotNil(t, err) - assert.Equal(t, "pool.DestroyTimeout must be set", err.Error()) -} diff --git a/socket_factory_test.go b/socket_factory_test.go index 45443337..0c953b33 100644 --- a/socket_factory_test.go +++ b/socket_factory_test.go @@ -206,7 +206,7 @@ func Test_Tcp_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -245,7 +245,7 @@ func Test_Tcp_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -390,7 +390,7 @@ func Test_Unix_Broken(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -433,7 +433,7 @@ func Test_Unix_Echo(t *testing.T) { t.Fatal(err) } - res, err := sw.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -511,7 +511,7 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -579,7 +579,7 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { } for n := 0; n < b.N; n++ { - if _, err := sw.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := sw.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } diff --git a/static_pool.go b/static_pool.go index 2e72864d..bc990da5 100644 --- a/static_pool.go +++ b/static_pool.go @@ -14,6 +14,8 @@ const ( StopRequest = "{\"stop\":true}" ) +var bCtx = context.Background() + // StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack. type StaticPool struct { // pool behaviour @@ -40,9 +42,7 @@ type PoolEvent struct { // supervisor Supervisor, todo: think about it // stack func() (WorkerBase, error), func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error) { - if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config") - } + cfg.InitDefaults() p := &StaticPool{ cfg: cfg, @@ -92,8 +92,63 @@ func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { return p.ww.RemoveWorker(ctx, wb) } +func (p *StaticPool) Exec(rqs Payload) (Payload, error) { + w, err := p.ww.GetFreeWorker(context.Background()) + if err != nil && errors.Is(err, ErrWatcherStopped) { + return EmptyPayload, ErrWatcherStopped + } else if err != nil { + return EmptyPayload, err + } + + sw := w.(SyncWorker) + + rsp, err := sw.Exec(rqs) + if err != nil { + errJ := p.checkMaxJobs(bCtx, w) + if errJ != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errJ) + } + // soft job errors are allowed + if _, jobError := err.(TaskError); jobError { + p.ww.PushWorker(w) + return EmptyPayload, err + } + + sw.State().Set(StateInvalid) + errS := w.Stop(bCtx) + if errS != nil { + return EmptyPayload, fmt.Errorf("%v, %v", err, errS) + } + + return EmptyPayload, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + w.State().Set(StateInvalid) + err = w.Stop(bCtx) + if err != nil { + return EmptyPayload, err + } + return p.ExecWithContext(bCtx, rqs) + } + + if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { + err = p.ww.AllocateNew(bCtx) + if err != nil { + return EmptyPayload, err + } + } else { + p.muw.Lock() + p.ww.PushWorker(w) + p.muw.Unlock() + } + return rsp, nil +} + // Exec one task with given payload and context, returns result or error. -func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { +func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { + // todo: why TODO passed here? getWorkerCtx, cancel := context.WithTimeout(context.TODO(), p.cfg.AllocateTimeout) defer cancel() w, err := p.ww.GetFreeWorker(getWorkerCtx) @@ -105,10 +160,16 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { sw := w.(SyncWorker) - execCtx, cancel2 := context.WithTimeout(context.TODO(), p.cfg.ExecTTL) - defer cancel2() + var execCtx context.Context + if p.cfg.ExecTTL != 0 { + var cancel2 context.CancelFunc + execCtx, cancel2 = context.WithTimeout(context.TODO(), p.cfg.ExecTTL) + defer cancel2() + } else { + execCtx = ctx + } - rsp, err := sw.Exec(execCtx, rqs) + rsp, err := sw.ExecWithContext(execCtx, rqs) if err != nil { errJ := p.checkMaxJobs(ctx, w) if errJ != nil { @@ -136,7 +197,7 @@ func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error) { if err != nil { return EmptyPayload, err } - return p.Exec(ctx, rqs) + return p.ExecWithContext(ctx, rqs) } if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { diff --git a/static_pool_test.go b/static_pool_test.go index b2ab4713..fd8124ac 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -50,7 +50,7 @@ func Test_StaticPool_Invalid(t *testing.T) { assert.Error(t, err) } -func Test_ConfigError(t *testing.T) { +func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, @@ -61,8 +61,8 @@ func Test_ConfigError(t *testing.T) { }, ) - assert.Nil(t, p) - assert.Error(t, err) + assert.NotNil(t, p) + assert.NoError(t, err) } func Test_StaticPool_Echo(t *testing.T) { @@ -79,7 +79,7 @@ func Test_StaticPool_Echo(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -103,7 +103,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(ctx, Payload{Body: []byte("hello"), Context: nil}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: nil}) assert.NoError(t, err) assert.NotNil(t, res) @@ -127,7 +127,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello"), Context: []byte("world")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -150,7 +150,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -188,7 +188,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { } }() - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) @@ -212,7 +212,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { assert.NotNil(t, p) - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -290,11 +290,11 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, _ := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -337,14 +337,14 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { var lastPID string lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) if err != nil { t.Fatal(err) } assert.Equal(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { - res, err := p.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NoError(t, err) assert.NotNil(t, res) @@ -375,7 +375,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { assert.NoError(t, err) p.Destroy(ctx) - _, err = p.Exec(ctx, Payload{Body: []byte("100")}) + _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -398,7 +398,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { assert.NoError(t, err) go func() { - _, err := p.Exec(ctx, Payload{Body: []byte("100")}) + _, err := p.ExecWithContext(ctx, Payload{Body: []byte("100")}) if err != nil { t.Errorf("error executing payload: error %v", err) } @@ -406,7 +406,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { time.Sleep(time.Millisecond * 10) p.Destroy(ctx) - _, err = p.Exec(ctx, Payload{Body: []byte("100")}) + _, err = p.ExecWithContext(ctx, Payload{Body: []byte("100")}) assert.Error(t, err) } @@ -433,7 +433,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { w.State().Set(StateErrored) } - _, err = p.Exec(ctx, Payload{Body: []byte("hello")}) + _, err = p.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) } @@ -472,7 +472,7 @@ func Benchmark_Pool_Echo(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() } } @@ -499,7 +499,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { wg.Add(1) go func() { defer wg.Done() - if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } @@ -529,7 +529,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - if _, err := p.Exec(ctx, Payload{Body: []byte("hello")}); err != nil { + if _, err := p.ExecWithContext(ctx, Payload{Body: []byte("hello")}); err != nil { b.Fail() log.Println(err) } diff --git a/sync_worker.go b/sync_worker.go index 45629f3e..a6e1ed01 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -14,8 +14,11 @@ var EmptyPayload = Payload{} type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase - // Exec used to execute payload on the SyncWorker - Exec(ctx context.Context, rqs Payload) (Payload, error) + // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS + Exec(rqs Payload) (Payload, error) + + // ExecWithContext allow to set ExecTTL + ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) } type taskWorker struct { @@ -33,7 +36,7 @@ type twexec struct { err error } -func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) { +func (tw *taskWorker) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { c := make(chan twexec) go func() { if len(rqs.Body) == 0 && len(rqs.Context) == 0 { @@ -92,6 +95,36 @@ func (tw *taskWorker) Exec(ctx context.Context, rqs Payload) (Payload, error) { } } +// +func (tw *taskWorker) Exec(rqs Payload) (Payload, error) { + if len(rqs.Body) == 0 && len(rqs.Context) == 0 { + return EmptyPayload, fmt.Errorf("payload can not be empty") + } + + if tw.w.State().Value() != StateReady { + return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()) + } + + // set last used time + tw.w.State().SetLastUsed(uint64(time.Now().UnixNano())) + tw.w.State().Set(StateWorking) + + rsp, err := tw.execPayload(rqs) + if err != nil { + if _, ok := err.(TaskError); !ok { + tw.w.State().Set(StateErrored) + tw.w.State().RegisterExec() + } + return EmptyPayload, err + } + + tw.w.State().Set(StateReady) + tw.w.State().RegisterExec() + + return rsp, nil + +} + func (tw *taskWorker) execPayload(rqs Payload) (Payload, error) { // two things; todo: merge if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { diff --git a/sync_worker_test.go b/sync_worker_test.go index e1cec4b6..f4868009 100644 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -34,7 +34,7 @@ func Test_Echo(t *testing.T) { } }() - res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -65,7 +65,7 @@ func Test_BadPayload(t *testing.T) { } }() - res, err := syncWorker.Exec(ctx, EmptyPayload) + res, err := syncWorker.ExecWithContext(ctx, EmptyPayload) assert.Error(t, err) assert.Nil(t, res.Body) @@ -94,7 +94,7 @@ func Test_NotStarted_Exec(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Body) @@ -143,7 +143,7 @@ func Test_Echo_Slow(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.Nil(t, err) assert.NotNil(t, res) @@ -185,7 +185,7 @@ func Test_Broken(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -215,7 +215,7 @@ func Test_Error(t *testing.T) { t.Fatal(err) } - res, err := syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + res, err := syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) assert.NotNil(t, err) assert.Nil(t, res.Body) assert.Nil(t, res.Context) @@ -244,19 +244,19 @@ func Test_NumExecs(t *testing.T) { t.Fatal(err) } - _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(1), w.State().NumExecs()) - _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } assert.Equal(t, int64(2), w.State().NumExecs()) - _, err = syncWorker.Exec(ctx, Payload{Body: []byte("hello")}) + _, err = syncWorker.ExecWithContext(ctx, Payload{Body: []byte("hello")}) if err != nil { t.Errorf("fail to execute payload: error %v", err) } diff --git a/util/doc.go b/util/doc.go index c6006de4..a3798715 100644 --- a/util/doc.go +++ b/util/doc.go @@ -2,4 +2,4 @@ package util /* This package should not contain roadrunner dependencies, only system or third-party - */ +*/ diff --git a/util/isolate_win.go b/util/isolate_win.go index 77674b3b..6756d59f 100644 --- a/util/isolate_win.go +++ b/util/isolate_win.go @@ -14,4 +14,4 @@ func IsolateProcess(cmd *exec.Cmd) { func ExecuteFromUser(cmd *exec.Cmd, u string) error { return nil -}
\ No newline at end of file +} diff --git a/util/network_windows_test.go b/util/network_windows_test.go index a5a8064e..b6648ed0 100644 --- a/util/network_windows_test.go +++ b/util/network_windows_test.go @@ -13,4 +13,4 @@ func TestCreateListener(t *testing.T) { _, err = CreateListener("aaa://192.168.0.1") assert.Error(t, err, "Invalid Protocol (tcp://:6001, unix://file.sock)") -}
\ No newline at end of file +} @@ -24,7 +24,7 @@ const ( EventWorkerLog // EventWorkerWaitDone triggered when worker exit from process Wait - EventWorkerWaitDone + EventWorkerWaitDone // todo: implemented? EventWorkerBufferClosed @@ -61,6 +61,7 @@ type WorkerBase interface { // Start used to run Cmd and immediately return Start() error + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -69,11 +70,14 @@ type WorkerBase interface { // Stop sends soft termination command to the WorkerProcess and waits for process completion. Stop(ctx context.Context) error + // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! Kill(ctx context.Context) error + // Relay returns attached to worker goridge relay Relay() goridge.Relay + // AttachRelay used to attach goridge relay to the worker process AttachRelay(rl goridge.Relay) } |