diff options
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/redial.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go index e1922517..784337ad 100644 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -2,30 +2,40 @@ package beanstalk import ( "sync/atomic" + "time" - "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" ) func (j *JobConsumer) redial() { for range j.reconnectCh { // backoff here - - j.Lock() - - var err error - j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout) - if err != nil { - panic(err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + op := func() error { + err := j.pool.Redial() + if err != nil { + return err + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil } - j.tube = beanstalk.NewTube(j.conn, j.tName) - j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName) + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed", "error", retryErr) + continue + } // restart listener if atomic.LoadUint32(&j.listeners) == 1 { + // stop previous listener + j.stopCh <- struct{}{} go j.listen() } - - j.Unlock() } } |