summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
committerValery Piashchynski <[email protected]>2021-08-17 19:55:15 +0300
commitab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch)
tree0a58b043605ef1d9b09e75b207c236aacb1ed55a /pkg/pool/static_pool.go
parentbd0da830ae345e1ed4a67782bf413673beeba037 (diff)
Update to go 1.17
Add Stat with tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go58
1 files changed, 35 insertions, 23 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index b20e4242..3eb0714f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them to complete the task).
+// Destroy all underlying stack (but let them complete the task).
func (sp *StaticPool) Destroy(ctx context.Context) {
sp.ww.Destroy(ctx)
}
@@ -250,36 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
switch {
case errors.Is(errors.ExecTTL, err):
sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)})
+ w.State().Set(worker.StateInvalid)
+ return nil, err
case errors.Is(errors.SoftJob, err):
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // TODO suspicious logic, redesign
- err = sp.ww.Allocate()
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)})
- }
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
w.State().Set(worker.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
}
- } else {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err})
- sp.ww.Release(w)
+
+ return nil, err
}
- }
- w.State().Set(worker.StateInvalid)
- sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- // kill worker instead of stop, because worker here might be in the broken state (network) which leads us
- // to the error
- errS := w.Kill()
- if errS != nil {
- return nil, errors.E(op, err, errS)
- }
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
- return nil, errors.E(op, err)
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, errors.E(op, err)
+ }
}
}