From ac5a89d63dbed7e0318c7e70ec1b0fa9c98c198a Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 27 Oct 2020 16:01:31 +0300 Subject: Merge changes, pool comments add --- static_pool.go | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) (limited to 'static_pool.go') diff --git a/static_pool.go b/static_pool.go index 7a5f6103..2337b2fd 100755 --- a/static_pool.go +++ b/static_pool.go @@ -189,9 +189,9 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { return r, err } -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { const op = errors.Op("Exec") - w, err := p.ww.GetFreeWorker(context.Background()) + w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil && errors.Is(errors.ErrWatcherStopped, err) { return EmptyPayload, errors.E(op, err) } else if err != nil { @@ -203,27 +203,27 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(bCtx) + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } } else { - p.ww.PushWorker(w) + sp.ww.PushWorker(w) } return EmptyPayload, err } sw.State().Set(StateInvalid) - p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -238,21 +238,19 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.Exec(rqs) + return sp.Exec(rqs) } - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(bCtx) + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) if err != nil { return EmptyPayload, err } } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() + sp.ww.PushWorker(w) } return rsp, nil } -- cgit v1.2.3