diff options
-rwxr-xr-x | static_pool.go | 30 | ||||
-rwxr-xr-x | supervisor_pool.go | 1 | ||||
-rwxr-xr-x | sync_worker.go | 3 |
3 files changed, 17 insertions, 17 deletions
diff --git a/static_pool.go b/static_pool.go index 7a5f6103..2337b2fd 100755 --- a/static_pool.go +++ b/static_pool.go @@ -189,9 +189,9 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) { return r, err } -func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { +func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) { const op = errors.Op("Exec") - w, err := p.ww.GetFreeWorker(context.Background()) + w, err := sp.ww.GetFreeWorker(context.Background()) if err != nil && errors.Is(errors.ErrWatcherStopped, err) { return EmptyPayload, errors.E(op, err) } else if err != nil { @@ -203,27 +203,27 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, rsp, err := sw.ExecWithContext(ctx, rqs) if err != nil { // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err := p.ww.AllocateNew(bCtx) + if _, jobError := err.(ExecError); jobError { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err := sp.ww.AllocateNew(bCtx) if err != nil { - p.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) + sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err}) } w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } } else { - p.ww.PushWorker(w) + sp.ww.PushWorker(w) } return EmptyPayload, err } sw.State().Set(StateInvalid) - p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) + sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w}) errS := w.Stop(bCtx) if errS != nil { @@ -238,21 +238,19 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, w.State().Set(StateInvalid) err = w.Stop(bCtx) if err != nil { - p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) + sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err}) } - return p.Exec(rqs) + return sp.Exec(rqs) } - if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - err = p.ww.AllocateNew(bCtx) + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + err = sp.ww.AllocateNew(bCtx) if err != nil { return EmptyPayload, err } } else { - p.muw.Lock() - p.ww.PushWorker(w) - p.muw.Unlock() + sp.ww.PushWorker(w) } return rsp, nil } diff --git a/supervisor_pool.go b/supervisor_pool.go index 0293ab8b..5dca3c22 100755 --- a/supervisor_pool.go +++ b/supervisor_pool.go @@ -13,6 +13,7 @@ const MB = 1024 * 1024 type SupervisedPool interface { Pool + // Start used to start watching process for all pool workers Start() } diff --git a/sync_worker.go b/sync_worker.go index 6dd8d8e8..d933077b 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -20,6 +20,7 @@ type SyncWorker interface { // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) + // ExecWithContext used to handle Exec with TTL ExecWithContext(ctx context.Context, p Payload) (Payload, error) } @@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, rsp, err := tw.execPayload(p) if err != nil { - if _, ok := err.(JobError); !ok { + if _, ok := err.(ExecError); !ok { tw.w.State().Set(StateErrored) tw.w.State().RegisterExec() } |