diff options
author | Valery Piashchynski <[email protected]> | 2021-06-25 22:07:52 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-25 22:07:52 +0300 |
commit | ae58af3ca1d37cb61f106146e4bf9bd1d033e8b3 (patch) | |
tree | 8d14a41607080298e1f0441a9ad620aee17da39e /pkg/pool | |
parent | e9249c7896331bab97a18a7ee0db17803fdd31fb (diff) |
- Fix bug with an exec_ttl reallocation issue
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'pkg/pool')
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 74 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 6 |
3 files changed, 11 insertions, 71 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index ab025fa1..e568661f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo return sp.execDebugWithTTL(ctx, p) } - ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxAlloc, op) if err != nil { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index ca61dbc4..a1dd21ac 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -type ttlExec struct { - err error - p payload.Payload -} - -func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - return sp.pool.Exec(rqs) - } - - c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) - defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } - } +func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { + panic("used to satisfy pool interface") } func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { @@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { return sp.pool.Exec(rqs) } - c := make(chan ttlExec, 1) ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return payload.Payload{}, errors.E(op, err) } + + return res, nil } func (sp *supervised) GetConfig() interface{} { diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index dc307c33..f371b925 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.execWithTTL(context.Background(), payload.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) |