summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xstatic_pool.go30
-rwxr-xr-xsupervisor_pool.go1
-rwxr-xr-xsync_worker.go3
3 files changed, 17 insertions, 17 deletions
diff --git a/static_pool.go b/static_pool.go
index 7a5f6103..2337b2fd 100755
--- a/static_pool.go
+++ b/static_pool.go
@@ -189,9 +189,9 @@ func (sp *StaticPool) execDebug(p Payload) (Payload, error) {
return r, err
}
-func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
+func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) {
const op = errors.Op("Exec")
- w, err := p.ww.GetFreeWorker(context.Background())
+ w, err := sp.ww.GetFreeWorker(context.Background())
if err != nil && errors.Is(errors.ErrWatcherStopped, err) {
return EmptyPayload, errors.E(op, err)
} else if err != nil {
@@ -203,27 +203,27 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload,
rsp, err := sw.ExecWithContext(ctx, rqs)
if err != nil {
// soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err := p.ww.AllocateNew(bCtx)
+ if _, jobError := err.(ExecError); jobError {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew(bCtx)
if err != nil {
- p.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
+ sp.events.Push(PoolEvent{Event: EventPoolError, Payload: err})
}
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
} else {
- p.ww.PushWorker(w)
+ sp.ww.PushWorker(w)
}
return EmptyPayload, err
}
sw.State().Set(StateInvalid)
- p.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
+ sp.events.Push(PoolEvent{Event: EventWorkerDestruct, Payload: w})
errS := w.Stop(bCtx)
if errS != nil {
@@ -238,21 +238,19 @@ func (p *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload,
w.State().Set(StateInvalid)
err = w.Stop(bCtx)
if err != nil {
- p.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
+ sp.events.Push(WorkerEvent{Event: EventWorkerError, Worker: w, Payload: err})
}
- return p.Exec(rqs)
+ return sp.Exec(rqs)
}
- if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- err = p.ww.AllocateNew(bCtx)
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ err = sp.ww.AllocateNew(bCtx)
if err != nil {
return EmptyPayload, err
}
} else {
- p.muw.Lock()
- p.ww.PushWorker(w)
- p.muw.Unlock()
+ sp.ww.PushWorker(w)
}
return rsp, nil
}
diff --git a/supervisor_pool.go b/supervisor_pool.go
index 0293ab8b..5dca3c22 100755
--- a/supervisor_pool.go
+++ b/supervisor_pool.go
@@ -13,6 +13,7 @@ const MB = 1024 * 1024
type SupervisedPool interface {
Pool
+ // Start used to start watching process for all pool workers
Start()
}
diff --git a/sync_worker.go b/sync_worker.go
index 6dd8d8e8..d933077b 100755
--- a/sync_worker.go
+++ b/sync_worker.go
@@ -20,6 +20,7 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs Payload) (Payload, error)
+ // ExecWithContext used to handle Exec with TTL
ExecWithContext(ctx context.Context, p Payload) (Payload, error)
}
@@ -94,7 +95,7 @@ func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload,
rsp, err := tw.execPayload(p)
if err != nil {
- if _, ok := err.(JobError); !ok {
+ if _, ok := err.(ExecError); !ok {
tw.w.State().Set(StateErrored)
tw.w.State().RegisterExec()
}