summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-01-23 17:54:58 +0300
committerValery Piashchynski <[email protected]>2021-01-23 17:54:58 +0300
commit01a6fab935fc2e40d7b6c17ab75a20a74ca23d1d (patch)
tree59967433eb8220e60020011873623a7454941314 /pkg/pool/static_pool.go
parent29d6020a9e8a3713b22269ed946547c96c24d3da (diff)
Stabilization PR
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go64
1 files changed, 36 insertions, 28 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 438f936f..d1b726c1 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -132,8 +132,9 @@ func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error {
return sp.ww.RemoveWorker(wb)
}
+// Be careful, sync Exec with ExecWithContext
func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("exec")
+ const op = errors.Op("static_pool_exec")
if sp.cfg.Debug {
return sp.execDebug(p)
}
@@ -152,28 +153,21 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// worker want's to be terminated
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ sp.stopWorker(&w)
return sp.Exec(p)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
- }
- } else {
- sp.ww.PushWorker(w)
+ err = sp.checkMaxJobs(&w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
return rsp, nil
}
-func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+// Be careful, sync with pool.Exec method
+func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
const op = errors.Op("static_pool_exec_with_context")
ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
defer cancel()
@@ -182,32 +176,46 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- rsp, err := w.ExecWithTimeout(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, p)
if err != nil {
return sp.errEncoder(err, w)
}
// worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- w.State().Set(internal.StateInvalid)
- err = w.Stop()
- if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
- }
+ if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
+ sp.stopWorker(&w)
+ return sp.ExecWithContext(ctx, p)
+ }
- return sp.ExecWithContext(ctx, rqs)
+ err = sp.checkMaxJobs(&w)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- err = sp.ww.AllocateNew()
+ return rsp, nil
+}
+
+func (sp *StaticPool) stopWorker(w *worker.SyncWorker) {
+ const op = errors.Op("static_pool_stop_worker")
+ (*w).State().Set(internal.StateInvalid)
+ err := (*w).Stop()
+ if err != nil {
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: *w, Payload: errors.E(op, err)})
+ }
+}
+
+// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
+func (sp *StaticPool) checkMaxJobs(w *worker.SyncWorker) error {
+ const op = errors.Op("static_pool_check_max_jobs")
+ if sp.cfg.MaxJobs != 0 && (*w).State().NumExecs() >= sp.cfg.MaxJobs {
+ err := sp.ww.AllocateNew()
if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ return errors.E(op, err)
}
} else {
- sp.ww.PushWorker(w)
+ sp.ww.PushWorker(*w)
}
-
- return rsp, nil
+ return nil
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {