diff options
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 59 |
1 files changed, 34 insertions, 25 deletions
diff --git a/static_pool.go b/static_pool.go index d5511018..b626a499 100755 --- a/static_pool.go +++ b/static_pool.go @@ -82,7 +82,7 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg Poo } // put stack in the pool - err = p.ww.AddToWatch(ctx, workers) + err = p.ww.AddToWatch(workers) if err != nil { return nil, errors.E(op, err) } @@ -132,16 +132,18 @@ func (sp *StaticPool) Workers() (workers []WorkerBase) { return sp.ww.WorkersList() } -func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error { - return sp.ww.RemoveWorker(ctx, wb) +func (sp *StaticPool) RemoveWorker(wb WorkerBase) error { + return sp.ww.RemoveWorker(wb) } func (sp *StaticPool) Exec(p Payload) (Payload, error) { - const op = errors.Op("Exec") + const op = errors.Op("exec") if sp.cfg.Debug { return sp.execDebug(p) } - w, err := sp.ww.GetFreeWorker(context.Background()) + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) if err != nil { return EmptyPayload, errors.E(op, err) } @@ -171,7 +173,7 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { return EmptyPayload, errors.E(op, err) } @@ -189,14 +191,17 @@ func (sp *StaticPool) Exec(p Payload) (Payload, error) { } func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { - const op = errors.Op("Exec with context") - w, err := sp.ww.GetFreeWorker(context.Background()) + const op = errors.Op("exec with context") + ctxGetFree, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) + defer cancel() + w, err := sp.getWorker(ctxGetFree, op) if err != nil { return EmptyPayload, errors.E(op, err) } sw := w.(SyncWorker) + // apply all before function if len(sp.before) > 0 { for i := 0; i < len(sp.before); i++ { rqs = sp.before[i](rqs) @@ -220,7 +225,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload } if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { return EmptyPayload, errors.E(op, err) } @@ -228,6 +233,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload sp.ww.PushWorker(sw) } + // apply all after functions if len(sp.after) > 0 { for i := 0; i < len(sp.after); i++ { rsp = sp.after[i](rqs, rsp) @@ -237,6 +243,21 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload return rsp, nil } +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (WorkerBase, error) { + // GetFreeWorker function consumes context with timeout + w, err := sp.ww.GetFreeWorker(ctxGetFree) + if err != nil { + // if the error is of kind NoFreeWorkers, it means, that we can't get worker from the stack during the allocate timeout + if errors.Is(errors.NoFreeWorkers, err) { + sp.events.Push(PoolEvent{Event: EventNoFreeWorkers, Payload: errors.E(op, err)}) + return nil, errors.E(op, err) + } + // else if err not nil - return error + return nil, errors.E(op, err) + } + return w, nil +} + // Destroy all underlying stack (but let them to complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { sp.ww.Destroy(ctx) @@ -246,11 +267,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w WorkerBase) (Payload, error) { const op = errors.Op("error encoder") // soft job errors are allowed - if errors.Is(errors.Exec, err) { + if errors.Is(errors.ErrSoftJob, err) { if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err = sp.ww.AllocateNew(bCtx) + err = sp.ww.AllocateNew() if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: errors.E(op, err)}) + sp.events.Push(PoolEvent{Event: EventWorkerConstruct, Payload: errors.E(op, err)}) } w.State().Set(StateInvalid) @@ -318,22 +339,10 @@ func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([] w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) if err != nil { cancel() - return nil, errors.E(op, err) + return nil, errors.E(op, errors.WorkerAllocate, err) } workers = append(workers, w) cancel() } return workers, nil } - -func (sp *StaticPool) checkMaxJobs(ctx context.Context, w WorkerBase) error { - const op = errors.Op("check max jobs") - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - err := sp.ww.AllocateNew(ctx) - if err != nil { - sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) - return errors.E(op, err) - } - } - return nil -} |