diff options
author | Valery Piashchynski <[email protected]> | 2020-10-27 16:01:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-10-27 16:01:31 +0300 |
commit | ac5a89d63dbed7e0318c7e70ec1b0fa9c98c198a (patch) | |
tree | fe0d5674a7e011f2a5f1c4d117503c978f509546 /static_pool.go | |
parent | c9af916ae4d78334d348ed1ef7238206f3ecb7a1 (diff) |
Merge changes, pool comments add
Diffstat (limited to 'static_pool.go')
-rwxr-xr-x | static_pool.go | 30 |
1 files changed, 14 insertions, 16 deletions
diff --git a/static_pool.go b/static_pool.go index 7a5f6103..2337b2fd 100755 --- a/static_pool.go +++ b/static_pool.go @@ -189,9 +189,9 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { return r, err } -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { const op = errors.Op("Exec") - w, err := p.ww.GetFreeWorker(context.Background()) + w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil && errors.Is(errors.ErrWatcherStopped, err) { return EmptyPayload, errors.E(op, err) } else if err != nil { @@ -203,27 +203,27 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(bCtx) + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } } else { - p.ww.PushWorker(w) + sp.ww.PushWorker(w) } return EmptyPayload, err } sw.State().Set(StateInvalid) - p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -238,21 +238,19 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.Exec(rqs) + return sp.Exec(rqs) } - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(bCtx) + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) if err != nil { return EmptyPayload, err } } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() + sp.ww.PushWorker(w) } return rsp, nil } |