summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-10-27 16:01:31 +0300
committerValery Piashchynski <[email protected]>2020-10-27 16:01:31 +0300
commitac5a89d63dbed7e0318c7e70ec1b0fa9c98c198a (patch)
treefe0d5674a7e011f2a5f1c4d117503c978f509546 /static_pool.go
parentc9af916ae4d78334d348ed1ef7238206f3ecb7a1 (diff)
Merge changes, pool comments add
Diffstat (limited to 'static_pool.go')
-rwxr-xr-xstatic_pool.go30
1 files changed, 14 insertions, 16 deletions
diff --git a/static_pool.go b/static_pool.go
index 7a5f6103..2337b2fd 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -189,9 +189,9 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
return r, err
}
-func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
const op = errors.Op("Exec")
- w, err := p.ww.GetFreeWorker(context.Background())
+ w, err := sp.ww.GetFreeWorker(context.Background())
if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
return EmptyPayload, errors.E(op, err)
} else if err != nil {
@@ -203,27 +203,27 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload,
rsp, err := sw.ExecWithContext(ctx, rqs)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(bCtx)
+ if _, jobError := err.(ExecError); jobError {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(bCtx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
} else {
- p.ww.PushWorker(w)
+ sp.ww.PushWorker(w)
}
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
- p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -238,21 +238,19 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload,
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.Exec(rqs)
+ return sp.Exec(rqs)
}
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(bCtx)
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
return EmptyPayload, err
}
} else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
+ sp.ww.PushWorker(w)
}
return rsp, nil
}