summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/redial.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
committerValery Piashchynski <[email protected]>2021-07-08 17:54:29 +0300
commit4566f88004e81d3229222d82614c15346ac2e47d (patch)
tree05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a /plugins/jobs/brokers/amqp/redial.go
parent5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff)
AMQP update...
Add redialer, consumer, rabbit queues initializer. Update configuration (.rr.yaml). Add ack/nack for the jobs main plugin with error handling. Add Qos, queues bining and declaration. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/redial.go')
-rw-r--r--plugins/jobs/brokers/amqp/redial.go58
1 files changed, 58 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go
new file mode 100644
index 00000000..bfb1fbff
--- /dev/null
+++ b/plugins/jobs/brokers/amqp/redial.go
@@ -0,0 +1,58 @@
+package amqp
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/streadway/amqp"
+)
+
+// redialer used to redial to the rabbitmq in case of the connection interrupts
+func (j *JobsConsumer) redialer() {
+ go func() {
+ for err := range j.conn.NotifyClose(make(chan *amqp.Error)) {
+ if err != nil {
+ j.logger.Error("connection closed, reconnecting", "error", err)
+
+ expb := backoff.NewExponentialBackOff()
+ // set the retry timeout (minutes)
+ expb.MaxElapsedTime = time.Minute * j.retryTimeout
+ op := func() error {
+ j.logger.Warn("rabbitmq reconnecting, caused by", "error", err)
+
+ j.Lock()
+ var dialErr error
+ j.conn, dialErr = amqp.Dial(j.connStr)
+ if dialErr != nil {
+ j.Unlock()
+ return fmt.Errorf("fail to dial server endpoint: %v", dialErr)
+ }
+ j.Unlock()
+
+ j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+
+ // re-init connection
+ deliv, errInit := j.initRabbitMQ()
+ if errInit != nil {
+ j.Unlock()
+ j.logger.Error("error while redialing", "error", errInit)
+ return errInit
+ }
+
+ // restart listener
+ j.listener(deliv)
+
+ j.logger.Info("queues and subscribers redeclare succeed")
+ return nil
+ }
+
+ retryErr := backoff.Retry(op, expb)
+ if retryErr != nil {
+ j.logger.Error("backoff failed", "error", retryErr)
+ return
+ }
+ }
+ }
+ }()
+}