diff options
author | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-17 19:55:15 +0300 |
commit | ab690ab9c6ae2b00aef1b501e8b17ff02b5da753 (patch) | |
tree | 0a58b043605ef1d9b09e75b207c236aacb1ed55a /pkg/pool/static_pool.go | |
parent | bd0da830ae345e1ed4a67782bf413673beeba037 (diff) |
Update to go 1.17
Add Stat with tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-x | pkg/pool/static_pool.go | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index b20e4242..3eb0714f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -238,7 +238,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them to complete the task). +// Destroy all underlying stack (but let them complete the task). func (sp *StaticPool) Destroy(ctx context.Context) { sp.ww.Destroy(ctx) } @@ -250,36 +250,48 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { switch { case errors.Is(errors.ExecTTL, err): sp.events.Push(events.PoolEvent{Event: events.EventExecTTL, Payload: errors.E(op, err)}) + w.State().Set(worker.StateInvalid) + return nil, err case errors.Is(errors.SoftJob, err): - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // TODO suspicious logic, redesign - err = sp.ww.Allocate() - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventWorkerConstruct, Payload: errors.E(op, err)}) - } + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop w.State().Set(worker.StateInvalid) - err = w.Stop() - if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) } - } else { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: err}) - sp.ww.Release(w) + + return nil, err } - } - w.State().Set(worker.StateInvalid) - sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - // kill worker instead of stop, because worker here might be in the broken state (network) which leads us - // to the error - errS := w.Kill() - if errS != nil { - return nil, errors.E(op, err, errS) - } + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) - return nil, errors.E(op, err) + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(op, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, errors.E(op, err) + } } } |