summaryrefslogtreecommitdiff
path: root/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-12-23 20:57:41 +0300
committerValery Piashchynski <[email protected]>2021-12-23 20:57:41 +0300
commit5a9c81a4aee940a9bc476f2f3bafd13b411e2786 (patch)
treed53386065c22bd624bb8f92cb5733d6e728221d7 /pool/static_pool.go
parent31112495808ae37f38f7b514de1f40b8b8a75238 (diff)
add Reset method to the pool
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pool/static_pool.go')
-rwxr-xr-xpool/static_pool.go130
1 files changed, 73 insertions, 57 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 4906788f..7481f84f 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -156,6 +156,79 @@ func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error) {
return rsp, nil
}
+// Destroy all underlying stack (but let them complete the task).
+func (sp *StaticPool) Destroy(ctx context.Context) {
+ sp.events.Unsubscribe(sp.eventsID)
+ sp.ww.Destroy(ctx)
+}
+
+func (sp *StaticPool) Reset(ctx context.Context) error {
+ // destroy all workers
+ sp.ww.Reset(ctx)
+ workers, err := sp.allocateWorkers(sp.cfg.NumWorkers)
+ if err != nil {
+ return err
+ }
+ // add the NEW workers to the watcher
+ err = sp.ww.Watch(workers)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
+ return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
+ // just push event if on any stage was timeout error
+ switch {
+ case errors.Is(errors.ExecTTL, err):
+ sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
+ w.State().Set(worker.StateInvalid)
+ return nil, err
+
+ case errors.Is(errors.SoftJob, err):
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // if max jobs exceed
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
+ // mark old as invalid and stop
+ w.State().Set(worker.StateInvalid)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+
+ // soft jobs errors are allowed, just put the worker back
+ sp.ww.Release(w)
+
+ return nil, err
+ case errors.Is(errors.Network, err):
+ // in case of network error, we can't stop the worker, we should kill it
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+
+ // kill the worker instead of sending net packet to it
+ _ = w.Kill()
+
+ return nil, err
+ default:
+ w.State().Set(worker.StateInvalid)
+ sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
+ // stop the worker, worker here might be in the broken state (network)
+ errS := w.Stop()
+ if errS != nil {
+ return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
+ }
+
+ return nil, err
+ }
+ }
+}
+
// Be careful, sync with pool.Exec method
func (sp *StaticPool) execWithTTL(ctx context.Context, p *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
@@ -225,63 +298,6 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-// Destroy all underlying stack (but let them complete the task).
-func (sp *StaticPool) Destroy(ctx context.Context) {
- sp.events.Unsubscribe(sp.eventsID)
- sp.ww.Destroy(ctx)
-}
-
-func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
- return func(err error, w worker.BaseProcess) (*payload.Payload, error) {
- // just push event if on any stage was timeout error
- switch {
- case errors.Is(errors.ExecTTL, err):
- sp.events.Send(events.NewEvent(events.EventExecTTL, pluginName, fmt.Sprintf("worker stopped, execTTL timeout elapsed, error: %s", err)))
- w.State().Set(worker.StateInvalid)
- return nil, err
-
- case errors.Is(errors.SoftJob, err):
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // if max jobs exceed
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- // mark old as invalid and stop
- w.State().Set(worker.StateInvalid)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.SoftJob, errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
-
- // soft jobs errors are allowed, just put the worker back
- sp.ww.Release(w)
-
- return nil, err
- case errors.Is(errors.Network, err):
- // in case of network error, we can't stop the worker, we should kill it
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerError, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
-
- // kill the worker instead of sending net packet to it
- _ = w.Kill()
-
- return nil, err
- default:
- w.State().Set(worker.StateInvalid)
- sp.events.Send(events.NewEvent(events.EventWorkerDestruct, pluginName, fmt.Sprintf("error: %s, worker's pid: %d", err, w.Pid())))
- // stop the worker, worker here might be in the broken state (network)
- errS := w.Stop()
- if errS != nil {
- return nil, errors.E(errors.Errorf("err: %v\nerrStop: %v", err, errS))
- }
-
- return nil, err
- }
- }
-}
-
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)