diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
commit | 4566f88004e81d3229222d82614c15346ac2e47d (patch) | |
tree | 05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a /plugins/jobs/brokers/amqp/redial.go | |
parent | 5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff) |
AMQP update...
Add redialer, consumer, rabbit queues initializer.
Update configuration (.rr.yaml).
Add ack/nack for the jobs main plugin with error handling.
Add Qos, queues bining and declaration.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/redial.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go new file mode 100644 index 00000000..bfb1fbff --- /dev/null +++ b/plugins/jobs/brokers/amqp/redial.go @@ -0,0 +1,58 @@ +package amqp + +import ( + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/streadway/amqp" +) + +// redialer used to redial to the rabbitmq in case of the connection interrupts +func (j *JobsConsumer) redialer() { + go func() { + for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { + if err != nil { + j.logger.Error("connection closed, reconnecting", "error", err) + + expb := backoff.NewExponentialBackOff() + // set the retry timeout (minutes) + expb.MaxElapsedTime = time.Minute * j.retryTimeout + op := func() error { + j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + + j.Lock() + var dialErr error + j.conn, dialErr = amqp.Dial(j.connStr) + if dialErr != nil { + j.Unlock() + return fmt.Errorf("fail to dial server endpoint: %v", dialErr) + } + j.Unlock() + + j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + + // re-init connection + deliv, errInit := j.initRabbitMQ() + if errInit != nil { + j.Unlock() + j.logger.Error("error while redialing", "error", errInit) + return errInit + } + + // restart listener + j.listener(deliv) + + j.logger.Info("queues and subscribers redeclare succeed") + return nil + } + + retryErr := backoff.Retry(op, expb) + if retryErr != nil { + j.logger.Error("backoff failed", "error", retryErr) + return + } + } + } + }() +} |