summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/beanstalk/redial.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
committerValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
commitb2da831f47284974551710d2767a7bdde0efa51d (patch)
tree7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/beanstalk/redial.go
parent50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff)
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation. Complete redia for the beanstalk. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/beanstalk/redial.go')
-rw-r--r--plugins/jobs/drivers/beanstalk/redial.go34
1 files changed, 22 insertions, 12 deletions
diff --git a/plugins/jobs/drivers/beanstalk/redial.go b/plugins/jobs/drivers/beanstalk/redial.go
index e1922517..784337ad 100644
--- a/plugins/jobs/drivers/beanstalk/redial.go
+++ b/plugins/jobs/drivers/beanstalk/redial.go
@@ -2,30 +2,40 @@ package beanstalk
import (
"sync/atomic"
+ "time"
- "github.com/beanstalkd/go-beanstalk"
+ "github.com/cenkalti/backoff/v4"
)
func (j *JobConsumer) redial() {
for range j.reconnectCh {
// backoff here
-
- j.Lock()
-
- var err error
- j.conn, err = beanstalk.DialTimeout(j.network, j.addr, j.tout)
- if err != nil {
- panic(err)
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * 5
+
+ op := func() error {
+ err := j.pool.Redial()
+ if err != nil {
+ return err
+ }
+
+ j.log.Info("beanstalk redial was successful")
+ // reassign a pool
+ return nil
}
- j.tube = beanstalk.NewTube(j.conn, j.tName)
- j.tubeSet = beanstalk.NewTubeSet(j.conn, j.tName)
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.log.Error("beanstalk backoff failed", "error", retryErr)
+ continue
+ }
// restart listener
if atomic.LoadUint32(&j.listeners) == 1 {
+ // stop previous listener
+ j.stopCh <- struct{}{}
go j.listen()
}
-
- j.Unlock()
}
}