summaryrefslogtreecommitdiff
path: root/pkg/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-25 22:07:52 +0300
committerValery Piashchynski <[email protected]>2021-06-25 22:07:52 +0300
commitae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (patch)
tree8d14a41607080298e1f0441a9ad620aee17da39e /pkg/pool
parente9249c7896331bab97a18a7ee0db17803fdd31fb (diff)
- Fix bug with an exec_ttl reallocation issue
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool')
-rwxr-xr-xpkg/pool/static_pool.go2
-rwxr-xr-xpkg/pool/supervisor_pool.go74
-rw-r--r--pkg/pool/supervisor_test.go6
3 files changed, 11 insertions, 71 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index ab025fa1..e568661f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo
return sp.execDebugWithTTL(ctx, p)
}
- ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
+ ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout)
defer cancel()
w, err := sp.getWorker(ctxAlloc, op)
if err != nil {
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index ca61dbc4..a1dd21ac 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig)
return sp
}
-type ttlExec struct {
- err error
- p payload.Payload
-}
-
-func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised_exec_with_context")
- if sp.cfg.ExecTTL == 0 {
- return sp.pool.Exec(rqs)
- }
-
- c := make(chan ttlExec, 1)
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
- defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
-
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
-
- return res.p, nil
- }
- }
+func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) {
+ panic("used to satisfy pool interface")
}
func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
@@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
return sp.pool.Exec(rqs)
}
- c := make(chan ttlExec, 1)
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
- go func() {
- res, err := sp.pool.execWithTTL(ctx, rqs)
- if err != nil {
- c <- ttlExec{
- err: errors.E(op, err),
- p: payload.Payload{},
- }
- }
- c <- ttlExec{
- err: nil,
- p: res,
- }
- }()
-
- for {
- select {
- case <-ctx.Done():
- return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
- case res := <-c:
- if res.err != nil {
- return payload.Payload{}, res.err
- }
-
- return res.p, nil
- }
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ return payload.Payload{}, errors.E(op, err)
}
+
+ return res, nil
}
func (sp *supervised) GetConfig() interface{} {
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index dc307c33..f371b925 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
pid := p.Workers()[0].Pid()
- resp, err := p.execWithTTL(context.Background(), payload.Payload{
+ resp, err := p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})
@@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
time.Sleep(time.Second * 5)
// worker should be marked as invalid and reallocated
- _, err = p.execWithTTL(context.Background(), payload.Payload{
+ _, err = p.Exec(payload.Payload{
Context: []byte(""),
Body: []byte("foo"),
})