summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/connection.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
committerValery Piashchynski <[email protected]>2021-07-27 12:39:01 +0300
commit1e59ec2755a9cdafd26864ba532fa4d3eff46ecd (patch)
tree68c7c7e8d9f4d99debc4895ab8469e323c60f47b /plugins/jobs/drivers/beanstalk/connection.go
parentd72181126867c7e8fc05e5ac927bd90d01e0dbc7 (diff)
Initial support for the cancellation via context
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/connection.go25
1 files changed, 12 insertions, 13 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go
index 6cc50c07..797b4821 100644
--- a/plugins/jobs/drivers/beanstalk/connection.go
+++ b/plugins/jobs/drivers/beanstalk/connection.go
@@ -1,6 +1,7 @@
package beanstalk
import (
+ "context"
"net"
"sync"
"time"
@@ -54,14 +55,14 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger.
}, nil
}
-func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
+func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) {
cp.RLock()
defer cp.RUnlock()
id, err := cp.t.Put(body, pri, delay, ttr)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
+ errN := cp.checkAndRedial(ctx, err)
if errN != nil {
return 0, errN
} else {
@@ -80,14 +81,14 @@ func (cp *ConnPool) Put(body []byte, pri uint32, delay, ttr time.Duration) (uint
// Typically, a client will reserve a job, perform some work, then delete
// the job with Conn.Delete.
-func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) {
+func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) {
cp.RLock()
defer cp.RUnlock()
id, body, err := cp.ts.Reserve(reserveTimeout)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
+ errN := cp.checkAndRedial(ctx, err)
if errN != nil {
return 0, nil, errN
} else {
@@ -99,14 +100,14 @@ func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error
return id, body, nil
}
-func (cp *ConnPool) Delete(id uint64) error {
+func (cp *ConnPool) Delete(ctx context.Context, id uint64) error {
cp.RLock()
defer cp.RUnlock()
err := cp.conn.Delete(id)
if err != nil {
// errN contains both, err and internal checkAndRedial error
- errN := cp.checkAndRedial(err)
+ errN := cp.checkAndRedial(ctx, err)
if errN != nil {
return errN
} else {
@@ -117,14 +118,12 @@ func (cp *ConnPool) Delete(id uint64) error {
return nil
}
-func (cp *ConnPool) redial() error {
+func (cp *ConnPool) redial(ctx context.Context) error {
const op = errors.Op("connection_pool_redial")
cp.Lock()
// backoff here
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = time.Minute * 5
+ expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
operation := func() error {
connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout)
@@ -165,7 +164,7 @@ func (cp *ConnPool) redial() error {
var connErrors = map[string]struct{}{"EOF": {}}
-func (cp *ConnPool) checkAndRedial(err error) error {
+func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error {
const op = errors.Op("connection_pool_check_redial")
switch et := err.(type) { //nolint:gocritic
// check if the error
@@ -173,7 +172,7 @@ func (cp *ConnPool) checkAndRedial(err error) error {
switch bErr := et.Err.(type) {
case *net.OpError:
cp.RUnlock()
- errR := cp.redial()
+ errR := cp.redial(ctx)
cp.RLock()
// if redial failed - return
if errR != nil {
@@ -186,7 +185,7 @@ func (cp *ConnPool) checkAndRedial(err error) error {
if _, ok := connErrors[et.Err.Error()]; ok {
// if error is related to the broken connection - redial
cp.RUnlock()
- errR := cp.redial()
+ errR := cp.redial(ctx)
cp.RLock()
// if redial failed - return
if errR != nil {