diff options
author | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
commit | b2da831f47284974551710d2767a7bdde0efa51d (patch) | |
tree | 7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/beanstalk/redial.go | |
parent | 50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff) |
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation.
Complete redia for the beanstalk.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/redial.go')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/redial.go | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go index e1922517..784337ad 100644 --- a/plugins/jobs/drivers/beanstalk/redial.go +++ b/plugins/jobs/drivers/beanstalk/redial.go @@ -2,30 +2,40 @@ package beanstalk import ( "sync/atomic" + "time" - "github.com/beanstalkd/go-beanstalk" + "github.com/cenkalti/backoff/v4" ) func (j *JobConsumer) redial() { for range j.reconnectCh { // backoff here - - j.Lock() - - var err error - j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout) - if err != nil { - panic(err) + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * 5 + + op := func() error { + err := j.pool.Redial() + if err != nil { + return err + } + + j.log.Info("beanstalk redial was successful") + // reassign a pool + return nil } - j.tube = beanstalk.NewTube(j.conn, j.tName) - j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName) + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.log.Error("beanstalk backoff failed", "error", retryErr) + continue + } // restart listener if atomic.LoadUint32(&j.listeners) == 1 { + // stop previous listener + j.stopCh <- struct{}{} go j.listen() } - - j.Unlock() } } |