From 6d3bd7d47aa9d08847eecfb241298f323aae05ca Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Mon, 26 Oct 2020 21:12:46 +0300 Subject: - lazy load test --- pool.go | 5 +++++ static_pool_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/pool.go b/pool.go index bc57bcbd..de7e837d 100755 --- a/pool.go +++ b/pool.go @@ -75,6 +75,11 @@ type Config struct { // worker handle as many tasks as it can. MaxJobs int64 + // Deferred flag enables slower working mode which inits empty worker ahead of every request. Useful to debug + // applications with heavy bootload phase. Do not use at production. It is also keeps pool without any workers + // until first request. + Deferred bool + // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. AllocateTimeout time.Duration diff --git a/static_pool_test.go b/static_pool_test.go index ec80e92a..c9a43f69 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -301,6 +301,50 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { } } +func Test_StaticPool_Debug_Worker(t *testing.T) { + ctx := context.Background() + p, err := NewPool( + ctx, + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, + NewPipeFactory(), + Config{ + Deferred: true, + NumWorkers: 1, + MaxJobs: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + assert.NoError(t, err) + defer p.Destroy(ctx) + + assert.NotNil(t, p) + + assert.Len(t, p.Workers(), 0) + + var lastPID string + res, _ := p.Exec(Payload{Body: []byte("hello")}) + + assert.Len(t, p.Workers(), 1) + + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + res, _ = p.Exec(Payload{Body: []byte("hello")}) + assert.NotEqual(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + res, err := p.Exec(Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { ctx := context.Background() -- cgit v1.2.3 From 722584b153c009612ccbaf1fb7a9911f7afae476 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Mon, 26 Oct 2020 21:25:33 +0300 Subject: - lazy load test --- pool.go | 6 ++---- static_pool_test.go | 9 ++------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/pool.go b/pool.go index de7e837d..b1b58b6a 100755 --- a/pool.go +++ b/pool.go @@ -75,10 +75,8 @@ type Config struct { // worker handle as many tasks as it can. MaxJobs int64 - // Deferred flag enables slower working mode which inits empty worker ahead of every request. Useful to debug - // applications with heavy bootload phase. Do not use at production. It is also keeps pool without any workers - // until first request. - Deferred bool + // HeavyLoad flag creates new fresh worker before every request. + HeavyLoad bool // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. diff --git a/static_pool_test.go b/static_pool_test.go index c9a43f69..ffa39c2f 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -308,7 +308,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), Config{ - Deferred: true, + HeavyLoad: true, NumWorkers: 1, MaxJobs: 1, AllocateTimeout: time.Second, @@ -320,15 +320,10 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.NotNil(t, p) - assert.Len(t, p.Workers(), 0) + assert.Len(t, p.Workers(), 1) var lastPID string res, _ := p.Exec(Payload{Body: []byte("hello")}) - - assert.Len(t, p.Workers(), 1) - - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) - res, _ = p.Exec(Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) for i := 0; i < 10; i++ { -- cgit v1.2.3 From 9d2de5e46110c07eceb87dcfde3af9b6a6c4a76c Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Mon, 26 Oct 2020 21:28:11 +0300 Subject: - lazy load test --- static_pool.go | 4 ++++ static_pool_test.go | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/static_pool.go b/static_pool.go index 4ecbdd41..2905742b 100755 --- a/static_pool.go +++ b/static_pool.go @@ -43,6 +43,10 @@ type StaticPool struct { func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() + if cfg.HeavyLoad { + cfg.NumWorkers = 0 + } + p := &StaticPool{ cfg: cfg, cmd: cmd, diff --git a/static_pool_test.go b/static_pool_test.go index ffa39c2f..a3b19bad 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -320,12 +320,15 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { assert.NotNil(t, p) - assert.Len(t, p.Workers(), 1) + assert.Len(t, p.Workers(), 0) var lastPID string res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) + // todo: necessary? + assert.Len(t, p.Workers(), 0) + for i := 0; i < 10; i++ { lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) res, err := p.Exec(Payload{Body: []byte("hello")}) -- cgit v1.2.3 From 2176584129e493e08aed158bc050070d520ee183 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Mon, 26 Oct 2020 21:46:23 +0300 Subject: - implement worker debug mode --- errors.go | 6 ++-- errors_test.go | 2 +- pool.go | 6 ++-- static_pool.go | 90 ++++++++++++++++++++++++++++++++--------------------- static_pool_test.go | 9 ++---- sync_worker.go | 12 +++---- sync_worker_test.go | 2 +- worker_watcher.go | 2 +- 8 files changed, 72 insertions(+), 57 deletions(-) diff --git a/errors.go b/errors.go index 52356549..7c91a92b 100755 --- a/errors.go +++ b/errors.go @@ -1,11 +1,11 @@ package roadrunner -// JobError is job level error (no WorkerProcess halt), wraps at top +// ExecError is job level error (no WorkerProcess halt), wraps at top // of error context -type JobError []byte +type ExecError []byte // Error converts error context to string -func (te JobError) Error() string { +func (te ExecError) Error() string { return string(te) } diff --git a/errors_test.go b/errors_test.go index 75a86840..86ab908d 100755 --- a/errors_test.go +++ b/errors_test.go @@ -8,7 +8,7 @@ import ( ) func Test_JobError_Error(t *testing.T) { - e := JobError([]byte("error")) + e := ExecError([]byte("error")) assert.Equal(t, "error", e.Error()) } diff --git a/pool.go b/pool.go index b1b58b6a..aca1b340 100755 --- a/pool.go +++ b/pool.go @@ -66,6 +66,9 @@ type Pool interface { // Configures the pool behaviour. type Config struct { + // Debug flag creates new fresh worker before every request. + Debug bool + // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. NumWorkers int64 @@ -75,9 +78,6 @@ type Config struct { // worker handle as many tasks as it can. MaxJobs int64 - // HeavyLoad flag creates new fresh worker before every request. - HeavyLoad bool - // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. AllocateTimeout time.Duration diff --git a/static_pool.go b/static_pool.go index 2905742b..6f247d9e 100755 --- a/static_pool.go +++ b/static_pool.go @@ -43,8 +43,9 @@ type StaticPool struct { func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Config) (Pool, error) { cfg.InitDefaults() - if cfg.HeavyLoad { + if cfg.Debug { cfg.NumWorkers = 0 + cfg.MaxJobs = 1 } p := &StaticPool{ @@ -86,26 +87,30 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Con } // AddListener connects event listener to the pool. -func (p *StaticPool) AddListener(listener util.EventListener) { - p.events.AddListener(listener) +func (sp *StaticPool) AddListener(listener util.EventListener) { + sp.events.AddListener(listener) } // Config returns associated pool configuration. Immutable. -func (p *StaticPool) GetConfig() Config { - return p.cfg +func (sp *StaticPool) GetConfig() Config { + return sp.cfg } // Workers returns worker list associated with the pool. -func (p *StaticPool) Workers() (workers []WorkerBase) { - return p.ww.WorkersList() +func (sp *StaticPool) Workers() (workers []WorkerBase) { + return sp.ww.WorkersList() } -func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { - return p.ww.RemoveWorker(ctx, wb) +func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { + return sp.ww.RemoveWorker(ctx, wb) } -func (p *StaticPool) Exec(rqs Payload) (Payload, error) { - w, err := p.ww.GetFreeWorker(context.Background()) +func (sp *StaticPool) Exec(p Payload) (Payload, error) { + if sp.cfg.Debug { + return sp.execDebug(p) + } + + w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil && errors.Is(err, ErrWatcherStopped) { return EmptyPayload, ErrWatcherStopped } else if err != nil { @@ -114,30 +119,30 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { sw := w.(SyncWorker) - rsp, err := sw.Exec(rqs) + rsp, err := sw.Exec(p) if err != nil { // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(bCtx) + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } } else { - p.ww.PushWorker(w) + sp.ww.PushWorker(w) } return EmptyPayload, err } sw.State().Set(StateInvalid) - p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -152,25 +157,38 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.Exec(rqs) + return sp.Exec(p) } - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(bCtx) + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) if err != nil { return EmptyPayload, err } } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() + sp.ww.PushWorker(w) } return rsp, nil } +func (sp *StaticPool) execDebug(p Payload) (Payload, error) { + sw, err := sp.ww.allocator() + if err != nil { + return EmptyPayload, err + } + + r, err := sw.(SyncWorker).Exec(p) + + if stopErr := sw.Stop(context.Background()); stopErr != nil { + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: sw, Payload: err}) + } + + return r, err +} + // Exec one task with given payload and context, returns result or error. // func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { // // todo: why TODO passed here? @@ -204,7 +222,7 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { // } // // // soft job errors are allowed -// if _, jobError := err.(JobError); jobError { +// if _, jobError := err.(ExecError); jobError { // p.ww.PushWorker(w) // return EmptyPayload, err // } @@ -243,18 +261,18 @@ func (p *StaticPool) Exec(rqs Payload) (Payload, error) { // } // Destroy all underlying stack (but let them to complete the task). -func (p *StaticPool) Destroy(ctx context.Context) { - p.ww.Destroy(ctx) +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.ww.Destroy(ctx) } // allocate required number of stack -func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { +func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]WorkerBase, error) { var workers []WorkerBase // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { - ctx, cancel := context.WithTimeout(ctx, p.cfg.AllocateTimeout) - w, err := p.factory.SpawnWorkerWithContext(ctx, p.cmd()) + ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() return nil, err @@ -265,11 +283,11 @@ func (p *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]W return workers, nil } -func (p *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(ctx) +func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(ctx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) return err } } diff --git a/static_pool_test.go b/static_pool_test.go index a3b19bad..b75bd0bf 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -152,7 +152,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, JobError{}, err) + assert.IsType(t, ExecError{}, err) assert.Equal(t, "hello", err.Error()) } @@ -308,9 +308,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), Config{ - HeavyLoad: true, - NumWorkers: 1, - MaxJobs: 1, + Debug: true, AllocateTimeout: time.Second, DestroyTimeout: time.Second, }, @@ -326,11 +324,10 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { res, _ := p.Exec(Payload{Body: []byte("hello")}) assert.NotEqual(t, lastPID, string(res.Body)) - // todo: necessary? assert.Len(t, p.Workers(), 0) for i := 0; i < 10; i++ { - lastPID = strconv.Itoa(int(p.Workers()[0].Pid())) + assert.Len(t, p.Workers(), 0) res, err := p.Exec(Payload{Body: []byte("hello")}) assert.NoError(t, err) diff --git a/sync_worker.go b/sync_worker.go index d7c15e88..2f3eb1e4 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -18,7 +18,7 @@ type SyncWorker interface { WorkerBase // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS - Exec(rqs Payload) (Payload, error) + Exec(p Payload) (Payload, error) } type syncWorker struct { @@ -47,7 +47,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(JobError); !ok { + if _, ok := err.(ExecError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -60,13 +60,13 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { return rsp, nil } -func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { +func (tw *syncWorker) execPayload(p Payload) (Payload, error) { // two things; todo: merge - if err := sendControl(tw.w.Relay(), rqs.Context); err != nil { + if err := sendControl(tw.w.Relay(), p.Context); err != nil { return EmptyPayload, errors.Wrap(err, "header error") } - if err := tw.w.Relay().Send(rqs.Body, 0); err != nil { + if err := tw.w.Relay().Send(p.Body, 0); err != nil { return EmptyPayload, errors.Wrap(err, "sender error") } @@ -83,7 +83,7 @@ func (tw *syncWorker) execPayload(rqs Payload) (Payload, error) { } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, JobError(rsp.Context) + return EmptyPayload, ExecError(rsp.Context) } // add streaming support :) diff --git a/sync_worker_test.go b/sync_worker_test.go index 7f969283..1bc2deb1 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -206,7 +206,7 @@ func Test_Error(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.IsType(t, JobError{}, err) + assert.IsType(t, ExecError{}, err) assert.Equal(t, "hello", err.Error()) } diff --git a/worker_watcher.go b/worker_watcher.go index 25c88a1a..5ae54024 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -19,7 +19,7 @@ type Stack struct { func NewWorkersStack() *Stack { return &Stack{ - workers: make([]WorkerBase, 0, 12), + workers: make([]WorkerBase, 0), } } -- cgit v1.2.3