summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
committerValery Piashchynski <[email protected]>2021-03-28 14:00:54 +0300
commit2a58b1be2c79f2fe10c0a429878937661645a928 (patch)
treef3a7cd472c75c4dd2a97bcf97cb154731ed81230 /pkg/pool
parent970014530a23d57a3be41c6369ac6456d0b36ae1 (diff)
- Fix bug with the worker reallocating during the response
- Update .golangci and fix new warnings Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool')
-rw-r--r--pkg/pool/config.go2
-rwxr-xr-xpkg/pool/static_pool.go43
-rwxr-xr-xpkg/pool/static_pool_test.go10
-rwxr-xr-xpkg/pool/supervisor_pool.go1
4 files changed, 27 insertions, 29 deletions
diff --git a/pkg/pool/config.go b/pkg/pool/config.go
index 782f7ce9..2a3dabe4 100644
--- a/pkg/pool/config.go
+++ b/pkg/pool/config.go
@@ -5,7 +5,7 @@ import (
"time"
)
-// Configures the pool behaviour.
+// Configures the pool behavior.
type Config struct {
// Debug flag creates new fresh worker before every request.
Debug bool
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 0617cbc0..c8e45b82 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -47,7 +47,7 @@ type StaticPool struct {
allocator worker.Allocator
// err_encoder is the default Exec error encoder
- err_encoder ErrorEncoder //nolint
+ err_encoder ErrorEncoder //nolint:golint,stylecheck
}
// Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -159,11 +159,12 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
return sp.Exec(p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
-
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -188,11 +189,13 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execWithTTL(ctx, p)
}
- err = sp.checkMaxJobs(w)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+ if sp.cfg.MaxJobs != 0 {
+ sp.checkMaxJobs(w)
+ return rsp, nil
}
+ // return worker back
+ sp.ww.Push(w)
return rsp, nil
}
@@ -206,19 +209,15 @@ func (sp *StaticPool) stopWorker(w worker.BaseProcess) {
}
// checkMaxJobs check for worker number of executions and kill workers if that number more than sp.cfg.MaxJobs
-func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) error {
- const op = errors.Op("static_pool_check_max_jobs")
- if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
- w.State().Set(worker.StateDestroyed)
- sp.ww.Remove(w)
- err := sp.ww.Allocate()
- if err != nil {
- return errors.E(op, err)
- }
- } else {
+//go:inline
+func (sp *StaticPool) checkMaxJobs(w worker.BaseProcess) {
+ if w.State().NumExecs() >= sp.cfg.MaxJobs {
+ w.State().Set(worker.StateMaxJobsReached)
sp.ww.Push(w)
+ return
}
- return nil
+
+ sp.ww.Push(w)
}
func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
@@ -281,9 +280,9 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
- ctx, cancel := context.WithTimeout(ctx, timeout)
+ ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
+ w, err := factory.SpawnWorkerWithTimeout(ctxT, cmd(), sp.listeners...)
if err != nil {
return nil, err
}
@@ -316,7 +315,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
// allocate required number of stack
func (sp *StaticPool) allocateWorkers(numWorkers uint64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
- var workers []worker.BaseProcess
+ workers := make([]worker.BaseProcess, 0, numWorkers)
// constant number of stack simplify logic
for i := uint64(0); i < numWorkers; i++ {
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b1318f9d..2d8aad48 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -213,7 +213,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
- var cfg = Config{
+ var cfg2 = Config{
NumWorkers: 1,
AllocateTimeout: time.Second * 5,
DestroyTimeout: time.Second * 5,
@@ -223,7 +223,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
pipe.NewPipeFactory(),
- cfg,
+ cfg2,
AddListeners(listener),
)
assert.NoError(t, err)
@@ -361,7 +361,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
- AllocateTimeout: time.Second,
+ AllocateTimeout: time.Second * 100,
DestroyTimeout: time.Second,
},
)
@@ -432,8 +432,8 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
assert.NoError(t, err)
go func() {
- _, err := p.Exec(payload.Payload{Body: []byte("100")})
- if err != nil {
+ _, errP := p.Exec(payload.Payload{Body: []byte("100")})
+ if errP != nil {
t.Errorf("error executing payload: error %v", err)
}
}()
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index 5abeae7a..273adc30 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -166,7 +166,6 @@ func (sp *supervised) Stop() {
func (sp *supervised) control() {
now := time.Now()
- const op = errors.Op("supervised_pool_control_tick")
// MIGHT BE OUTDATED
// It's a copy of the Workers pointers