summaryrefslogtreecommitdiff
path: root/pkg/pool/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/pool/static_pool.go')
-rwxr-xr-xpkg/pool/static_pool.go43
1 files changed, 21 insertions, 22 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 0617cbc0..c8e45b82 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint
+ err_encoder ErrorEncoder //nolint:golint,stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -159,11 +159,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return sp.Exec(p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
-
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -188,11 +189,13 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execWithTTL(ctx, p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -206,19 +209,15 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
- const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- w.State().Set(worker.StateDestroyed)
- sp.ww.Remove(w)
- err := sp.ww.Allocate()
- if err != nil {
- return errors.E(op, err)
- }
- } else {
+//go:inline
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
+ if w.State().NumExecs() >= sp.cfg.MaxJobs {
+ w.State().Set(worker.StateMaxJobsReached)
sp.ww.Push(w)
+ return
}
- return nil
+
+ sp.ww.Push(w)
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
@@ -281,9 +280,9 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
- ctx, cancel := context.WithTimeout(ctx, timeout)
+ ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
if err != nil {
return nil, err
}
@@ -316,7 +315,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {