diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/connection.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/connection.go | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/plugins/jobs/drivers/beanstalk/connection.go b/plugins/jobs/drivers/beanstalk/connection.go index 797b4821..ae223f39 100644 --- a/plugins/jobs/drivers/beanstalk/connection.go +++ b/plugins/jobs/drivers/beanstalk/connection.go @@ -55,14 +55,16 @@ func NewConnPool(network, address, tName string, tout time.Duration, log logger. }, nil } -func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { +// Put the payload +// TODO use the context ?? +func (cp *ConnPool) Put(_ context.Context, body []byte, pri uint32, delay, ttr time.Duration) (uint64, error) { cp.RLock() defer cp.RUnlock() id, err := cp.t.Put(body, pri, delay, ttr) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, errN } else { @@ -81,14 +83,14 @@ func (cp *ConnPool) Put(ctx context.Context, body []byte, pri uint32, delay, ttr // Typically, a client will reserve a job, perform some work, then delete // the job with Conn.Delete. -func (cp *ConnPool) Reserve(ctx context.Context, reserveTimeout time.Duration) (uint64, []byte, error) { +func (cp *ConnPool) Reserve(reserveTimeout time.Duration) (uint64, []byte, error) { cp.RLock() defer cp.RUnlock() id, body, err := cp.ts.Reserve(reserveTimeout) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return 0, nil, errN } else { @@ -107,7 +109,7 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { err := cp.conn.Delete(id) if err != nil { // errN contains both, err and internal checkAndRedial error - errN := cp.checkAndRedial(ctx, err) + errN := cp.checkAndRedial(err) if errN != nil { return errN } else { @@ -118,12 +120,14 @@ func (cp *ConnPool) Delete(ctx context.Context, id uint64) error { return nil } -func (cp *ConnPool) redial(ctx context.Context) error { +func (cp *ConnPool) redial() error { const op = errors.Op("connection_pool_redial") cp.Lock() // backoff here - expb := backoff.WithContext(backoff.NewExponentialBackOff(), ctx) + expb := backoff.NewExponentialBackOff() + // TODO set via config + expb.MaxElapsedTime = time.Minute operation := func() error { connT, err := beanstalk.DialTimeout(cp.network, cp.address, cp.tout) @@ -164,7 +168,7 @@ func (cp *ConnPool) redial(ctx context.Context) error { var connErrors = map[string]struct{}{"EOF": {}} -func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { +func (cp *ConnPool) checkAndRedial(err error) error { const op = errors.Op("connection_pool_check_redial") switch et := err.(type) { //nolint:gocritic // check if the error @@ -172,7 +176,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { switch bErr := et.Err.(type) { case *net.OpError: cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { @@ -185,7 +189,7 @@ func (cp *ConnPool) checkAndRedial(ctx context.Context, err error) error { if _, ok := connErrors[et.Err.Error()]; ok { // if error is related to the broken connection - redial cp.RUnlock() - errR := cp.redial(ctx) + errR := cp.redial() cp.RLock() // if redial failed - return if errR != nil { |