diff options
author | Valery Piashchynski <[email protected]> | 2020-11-27 11:19:27 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-11-27 11:19:27 +0300 |
commit | b5020bfce6b5362400cb9b578fe32c1a6ed5d61a (patch) | |
tree | 902afaca9b225cfe9e3b498b97cc83dec13fcd9a | |
parent | 46ae5dcc22d971b0f909bce23ec8fdef26811ed6 (diff) | |
parent | 849a03b8ead6fe8e65ab1a1e5653a57c12502dd1 (diff) |
Merge pull request #416 from spiral/feature/static_pool_triggerv2.0.0-alpha22
Add new pool event: EventNoFreeWorkers
-rwxr-xr-x | go.mod | 2 | ||||
-rwxr-xr-x | go.sum | 2 | ||||
-rw-r--r-- | plugins/http/tests/http_test.go | 2 | ||||
-rwxr-xr-x | pool.go | 5 | ||||
-rwxr-xr-x | static_pool.go | 59 | ||||
-rwxr-xr-x | static_pool_test.go | 5 | ||||
-rwxr-xr-x | supervisor_pool.go | 12 | ||||
-rwxr-xr-x | sync_worker.go | 14 | ||||
-rwxr-xr-x | sync_worker_test.go | 10 | ||||
-rwxr-xr-x | worker_watcher.go | 36 |
10 files changed, 79 insertions, 68 deletions
@@ -14,7 +14,7 @@ require ( github.com/shirou/gopsutil v3.20.10+incompatible github.com/spf13/viper v1.7.1 github.com/spiral/endure v1.0.0-beta20 - github.com/spiral/errors v1.0.4 + github.com/spiral/errors v1.0.5 github.com/spiral/goridge/v2 v2.4.6 github.com/stretchr/testify v1.6.1 github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a @@ -244,6 +244,8 @@ github.com/spiral/endure v1.0.0-beta20 h1:QD3EJ6CRLgeo/6trfnlUcQhH3vrK8Hvf9ceDpd github.com/spiral/endure v1.0.0-beta20/go.mod h1:qCU2/4gAItVESzUK0yPExmUTlTcpRLqJUgcV+nqxn+o= github.com/spiral/errors v1.0.4 h1:Y6Bop9GszdDh+Dn3s5aqsGebNLydqZ1F6OdOIQ9EpU0= github.com/spiral/errors v1.0.4/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= +github.com/spiral/errors v1.0.5 h1:TwlR9cZtTgnZrSngcEUpyiMO9yJ45gdQ+XcrCRoCCAM= +github.com/spiral/errors v1.0.5/go.mod h1:SwMSZVdZkkJVgXNNafccqOaxWg0XPzVU/dEdUEInE0o= github.com/spiral/goridge/v2 v2.4.6 h1:9u/mrxCtOSy0lnumrpPCSOlGBX/Vprid/hFsnzWrd6k= github.com/spiral/goridge/v2 v2.4.6/go.mod h1:mYjL+Ny7nVfLqjRwIYV2pUSQ61eazvVclHII6FfZfYc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/plugins/http/tests/http_test.go b/plugins/http/tests/http_test.go index 451566ca..ed98d17e 100644 --- a/plugins/http/tests/http_test.go +++ b/plugins/http/tests/http_test.go @@ -811,7 +811,7 @@ func TestHttpMiddleware(t *testing.T) { wg.Add(1) go func() { - tt := time.NewTimer(time.Second * 10) + tt := time.NewTimer(time.Second * 15) defer wg.Done() for { select { @@ -30,6 +30,9 @@ const ( // EventSupervisorError triggered when supervisor can not complete work. EventSupervisorError + // EventNoFreeWorkers triggered when there are no free workers in the stack and timeout for worker allocate elapsed + EventNoFreeWorkers + // todo: EventMaxMemory caused when worker consumes more memory than allowed. EventMaxMemory @@ -60,7 +63,7 @@ type Pool interface { Workers() (workers []WorkerBase) // Remove worker from the pool. - RemoveWorker(ctx context.Context, worker WorkerBase) error + RemoveWorker(worker WorkerBase) error // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) diff --git a/static_pool.go b/static_pool.go index d5511018..b626a499 100755 --- a/static_pool.go +++ b/static_pool.go @@ -82,7 +82,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo } // put stack in the pool - err = p.ww.AddToWatch(ctx, workers) + err = p.ww.AddToWatch(workers) if err != nil { return nil, errors.E(op, err) } @@ -132,16 +132,18 @@ func (sp *StaticPool) Workers() (workers []WorkerBase) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { - return sp.ww.RemoveWorker(ctx, wb) +func (sp *StaticPool) RemoveWorker(wb WorkerBase) error { + return sp.ww.RemoveWorker(wb) } func (sp *StaticPool) Exec(p Payload) (Payload, error) { - const op = errors.Op("Exec") + const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) } - w, err := sp.ww.GetFreeWorker(context.Background()) + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) if err != nil { return EmptyPayload, errors.E(op, err) } @@ -171,7 +173,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { return EmptyPayload, errors.E(op, err) } @@ -189,14 +191,17 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - const op = errors.Op("Exec with context") - w, err := sp.ww.GetFreeWorker(context.Background()) + const op = errors.Op("exec with context") + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) if err != nil { return EmptyPayload, errors.E(op, err) } sw := w.(SyncWorker) + // apply all before function if len(sp.before) > 0 { for i := 0; i < len(sp.before); i++ { rqs = sp.before[i](rqs) @@ -220,7 +225,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload } if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { return EmptyPayload, errors.E(op, err) } @@ -228,6 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload sp.ww.PushWorker(sw) } + // apply all after functions if len(sp.after) > 0 { for i := 0; i < len(sp.after); i++ { rsp = sp.after[i](rqs, rsp) @@ -237,6 +243,21 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return rsp, nil } +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + // Destroy all underlying stack (but let them to complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { sp.ww.Destroy(ctx) @@ -246,11 +267,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w WorkerBase) (Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed - if errors.Is(errors.Exec, err) { + if errors.Is(errors.ErrSoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)}) + sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)}) } w.State().Set(StateInvalid) @@ -318,22 +339,10 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([] w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() - return nil, errors.E(op, err) + return nil, errors.E(op, errors.WorkerAllocate, err) } workers = append(workers, w) cancel() } return workers, nil } - -func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { - const op = errors.Op("check max jobs") - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(ctx) - if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - return errors.E(op, err) - } - } - return nil -} diff --git a/static_pool_test.go b/static_pool_test.go index e97e2034..2823cbc4 100755 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -153,7 +153,7 @@ func Test_StaticPool_JobError(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - if errors.Is(errors.Exec, err) == false { + if errors.Is(errors.ErrSoftJob, err) == false { t.Fatal("error should be of type errors.Exec") } @@ -273,6 +273,9 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { }, ) assert.Error(t, err) + if !errors.Is(errors.WorkerAllocate, err) { + t.Fatal("error should be of type WorkerAllocate") + } assert.Nil(t, p) } diff --git a/supervisor_pool.go b/supervisor_pool.go index 6fcb71e6..dfec5559 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -102,8 +102,8 @@ func (sp *supervisedPool) Workers() (workers []WorkerBase) { return sp.pool.Workers() } -func (sp *supervisedPool) RemoveWorker(ctx context.Context, worker WorkerBase) error { - return sp.pool.RemoveWorker(ctx, worker) +func (sp *supervisedPool) RemoveWorker(worker WorkerBase) error { + return sp.pool.RemoveWorker(worker) } func (sp *supervisedPool) Destroy(ctx context.Context) { @@ -134,7 +134,6 @@ func (sp *supervisedPool) Stop() { func (sp *supervisedPool) control() { now := time.Now() - ctx := context.TODO() const op = errors.Op("supervised pool control tick") // THIS IS A COPY OF WORKERS @@ -152,7 +151,7 @@ func (sp *supervisedPool) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= float64(sp.cfg.TTL) { - err = sp.pool.RemoveWorker(ctx, workers[i]) + err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return @@ -162,13 +161,12 @@ func (sp *supervisedPool) control() { } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - err = sp.pool.RemoveWorker(ctx, workers[i]) + err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return } sp.events.Push(PoolEvent{Event: EventMaxMemory, Payload: workers[i]}) - continue } @@ -194,7 +192,7 @@ func (sp *supervisedPool) control() { // maxWorkerIdle more than diff between now and last used if sp.cfg.IdleTTL-uint64(res) <= 0 { - err = sp.pool.RemoveWorker(ctx, workers[i]) + err = sp.pool.RemoveWorker(workers[i]) if err != nil { sp.events.Push(PoolEvent{Event: EventSupervisorError, Payload: errors.E(op, err)}) return diff --git a/sync_worker.go b/sync_worker.go index cd0f934e..7e4d21cc 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -2,7 +2,6 @@ package roadrunner import ( "context" - "fmt" "time" "github.com/spiral/errors" @@ -36,12 +35,13 @@ func NewSyncWorker(w WorkerBase) (SyncWorker, error) { // Exec payload without TTL timeout. func (tw *syncWorker) Exec(p Payload) (Payload, error) { + const op = errors.Op("sync worker Exec") if len(p.Body) == 0 && len(p.Context) == 0 { - return EmptyPayload, fmt.Errorf("payload can not be empty") + return EmptyPayload, errors.E(op, errors.Str("payload can not be empty")) } if tw.w.State().Value() != StateReady { - return EmptyPayload, fmt.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String()) + return EmptyPayload, errors.E(op, errors.Errorf("WorkerProcess is not ready (%s)", tw.w.State().String())) } // set last used time @@ -51,7 +51,7 @@ func (tw *syncWorker) Exec(p Payload) (Payload, error) { rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.Exec, err) == false { + if errors.Is(errors.ErrSoftJob, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -97,7 +97,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, rsp, err := tw.execPayload(p) if err != nil { // just to be more verbose - if errors.Is(errors.Exec, err) == false { + if errors.Is(errors.ErrSoftJob, err) == false { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } @@ -152,11 +152,11 @@ func (tw *syncWorker) execPayload(p Payload) (Payload, error) { } if !pr.HasFlag(goridge.PayloadControl) { - return EmptyPayload, fmt.Errorf("malformed WorkerProcess response") + return EmptyPayload, errors.E(op, errors.Str("malformed WorkerProcess response")) } if pr.HasFlag(goridge.PayloadError) { - return EmptyPayload, errors.E(op, errors.Exec, errors.Str(string(rsp.Context))) + return EmptyPayload, errors.E(op, errors.ErrSoftJob, errors.Str(string(rsp.Context))) } // add streaming support :) diff --git a/sync_worker_test.go b/sync_worker_test.go index 69e6ece9..9786d709 100755 --- a/sync_worker_test.go +++ b/sync_worker_test.go @@ -70,7 +70,7 @@ func Test_BadPayload(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.Equal(t, "payload can not be empty", err.Error()) + assert.Contains(t, err.Error(), "payload can not be empty") } func Test_NotStarted_String(t *testing.T) { @@ -98,7 +98,7 @@ func Test_NotStarted_Exec(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - assert.Equal(t, "WorkerProcess is not ready (inactive)", err.Error()) + assert.Contains(t, err.Error(), "WorkerProcess is not ready (inactive)") } func Test_String(t *testing.T) { @@ -215,10 +215,10 @@ func Test_Error(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) - if errors.Is(errors.Exec, err) == false { - t.Fatal("error should be of type errors.Exec") + if errors.Is(errors.ErrSoftJob, err) == false { + t.Fatal("error should be of type errors.ErrSoftJob") } - assert.Contains(t, err.Error(), "exec payload: Exec: hello") + assert.Contains(t, err.Error(), "exec payload: SoftJobError: hello") } func Test_NumExecs(t *testing.T) { diff --git a/worker_watcher.go b/worker_watcher.go index 3b83c8ff..8bc147d0 100755 --- a/worker_watcher.go +++ b/worker_watcher.go @@ -62,7 +62,7 @@ func (stack *Stack) Pop() (WorkerBase, bool) { type WorkerWatcher interface { // AddToWatch used to add stack to wait its state - AddToWatch(ctx context.Context, workers []WorkerBase) error + AddToWatch(workers []WorkerBase) error // GetFreeWorker provide first free worker GetFreeWorker(ctx context.Context) (WorkerBase, error) @@ -71,7 +71,7 @@ type WorkerWatcher interface { PushWorker(w WorkerBase) // AllocateNew used to allocate new worker and put in into the WorkerWatcher - AllocateNew(ctx context.Context) error + AllocateNew() error // Destroy destroys the underlying stack Destroy(ctx context.Context) @@ -80,7 +80,7 @@ type WorkerWatcher interface { WorkersList() []WorkerBase // RemoveWorker remove worker from the stack - RemoveWorker(ctx context.Context, wb WorkerBase) error + RemoveWorker(wb WorkerBase) error } // workerCreateFunc can be nil, but in that case, dead stack will not be replaced @@ -105,7 +105,7 @@ type workerWatcher struct { events util.EventsHandler } -func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error { +func (ww *workerWatcher) AddToWatch(workers []WorkerBase) error { for i := 0; i < len(workers); i++ { sw, err := NewSyncWorker(workers[i]) if err != nil { @@ -115,14 +115,14 @@ func (ww *workerWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) e sw.AddListener(ww.events.Push) go func(swc WorkerBase) { - ww.wait(ctx, swc) + ww.wait(swc) }(sw) } return nil } func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) { - const op = errors.Op("get_free_worker") + const op = errors.Op("GetFreeWorker") // thread safe operation w, stop := ww.stack.Pop() if stop { @@ -132,19 +132,15 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) // handle worker remove state // in this state worker is destroyed by supervisor if w != nil && w.State().Value() == StateRemove { - err := ww.RemoveWorker(ctx, w) + err := ww.RemoveWorker(w) if err != nil { return nil, err } // try to get next return ww.GetFreeWorker(ctx) } - // no free stack if w == nil { - // TODO allocate timeout - tout := time.NewTicker(time.Second * 180) - defer tout.Stop() for { select { default: @@ -157,8 +153,8 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) } ww.ReduceWorkersCount() return w, nil - case <-tout.C: - return nil, errors.E(op, errors.Str("no free workers in the stack")) + case <-ctx.Done(): + return nil, errors.E(op, errors.NoFreeWorkers, errors.Str("no free workers in the stack, timeout exceed")) } } } @@ -167,12 +163,12 @@ func (ww *workerWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error) return w, nil } -func (ww *workerWatcher) AllocateNew(ctx context.Context) error { +func (ww *workerWatcher) AllocateNew() error { ww.stack.mutex.Lock() const op = errors.Op("allocate new worker") sw, err := ww.allocator() if err != nil { - return errors.E(op, err) + return errors.E(op, errors.WorkerAllocate, err) } ww.addToWatch(sw) @@ -187,7 +183,7 @@ func (ww *workerWatcher) AllocateNew(ctx context.Context) error { return nil } -func (ww *workerWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error { +func (ww *workerWatcher) RemoveWorker(wb WorkerBase) error { ww.stack.mutex.Lock() const op = errors.Op("remove worker") defer ww.stack.mutex.Unlock() @@ -275,7 +271,7 @@ func (ww *workerWatcher) WorkersList() []WorkerBase { return workersCopy } -func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { +func (ww *workerWatcher) wait(w WorkerBase) { const op = errors.Op("process wait") err := w.Wait() if err != nil { @@ -300,7 +296,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { ww.ReduceWorkersCount() ww.stack.mutex.Unlock() - err = ww.AllocateNew(ctx) + err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, @@ -315,7 +311,7 @@ func (ww *workerWatcher) wait(ctx context.Context, w WorkerBase) { ww.stack.mutex.Unlock() // worker not in the stack (not returned), forget and allocate new - err = ww.AllocateNew(ctx) + err = ww.AllocateNew() if err != nil { ww.events.Push(PoolEvent{ Event: EventPoolError, @@ -329,6 +325,6 @@ func (ww *workerWatcher) addToWatch(wb WorkerBase) { ww.mutex.Lock() defer ww.mutex.Unlock() go func() { - ww.wait(context.Background(), wb) + ww.wait(wb) }() } |