diff options
author | Valery Piashchynski <[email protected]> | 2021-12-23 20:57:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-12-23 20:57:41 +0300 |
commit | 5a9c81a4aee940a9bc476f2f3bafd13b411e2786 (patch) | |
tree | d53386065c22bd624bb8f92cb5733d6e728221d7 /pool/static_pool.go | |
parent | 31112495808ae37f38f7b514de1f40b8b8a75238 (diff) |
add Reset method to the pool
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-x | pool/static_pool.go | 130 |
1 files changed, 73 insertions, 57 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 4906788f..7481f84f 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) { return rsp, nil } +// Destroy all underlying stack (but let them complete the task). +func (sp *StaticPool) Destroy(ctx context.Context) { + sp.events.Unsubscribe(sp.eventsID) + sp.ww.Destroy(ctx) +} + +func (sp *StaticPool) Reset(ctx context.Context) error { + // destroy all workers + sp.ww.Reset(ctx) + workers, err := sp.allocateWorkers(sp.cfg.NumWorkers) + if err != nil { + return err + } + // add the NEW workers to the watcher + err = sp.ww.Watch(workers) + if err != nil { + return err + } + + return nil +} + +func defaultErrEncoder(sp *StaticPool) ErrorEncoder { + return func(err error, w worker.BaseProcess) (*payload.Payload, error) { + // just push event if on any stage was timeout error + switch { + case errors.Is(errors.ExecTTL, err): + sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) + w.State().Set(worker.StateInvalid) + return nil, err + + case errors.Is(errors.SoftJob, err): + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // if max jobs exceed + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { + // mark old as invalid and stop + w.State().Set(worker.StateInvalid) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + + // soft jobs errors are allowed, just put the worker back + sp.ww.Release(w) + + return nil, err + case errors.Is(errors.Network, err): + // in case of network error, we can't stop the worker, we should kill it + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + + // kill the worker instead of sending net packet to it + _ = w.Kill() + + return nil, err + default: + w.State().Set(worker.StateInvalid) + sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) + // stop the worker, worker here might be in the broken state (network) + errS := w.Stop() + if errS != nil { + return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) + } + + return nil, err + } + } +} + // Be careful, sync with pool.Exec method func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") @@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -// Destroy all underlying stack (but let them complete the task). -func (sp *StaticPool) Destroy(ctx context.Context) { - sp.events.Unsubscribe(sp.eventsID) - sp.ww.Destroy(ctx) -} - -func defaultErrEncoder(sp *StaticPool) ErrorEncoder { - return func(err error, w worker.BaseProcess) (*payload.Payload, error) { - // just push event if on any stage was timeout error - switch { - case errors.Is(errors.ExecTTL, err): - sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err))) - w.State().Set(worker.StateInvalid) - return nil, err - - case errors.Is(errors.SoftJob, err): - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // if max jobs exceed - if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { - // mark old as invalid and stop - w.State().Set(worker.StateInvalid) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - - // soft jobs errors are allowed, just put the worker back - sp.ww.Release(w) - - return nil, err - case errors.Is(errors.Network, err): - // in case of network error, we can't stop the worker, we should kill it - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - - // kill the worker instead of sending net packet to it - _ = w.Kill() - - return nil, err - default: - w.State().Set(worker.StateInvalid) - sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid()))) - // stop the worker, worker here might be in the broken state (network) - errS := w.Stop() - if errS != nil { - return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS)) - } - - return nil, err - } - } -} - func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) |