summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/redial.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/redial.go')
-rw-r--r--plugins/amqp/amqpjobs/redial.go138
1 files changed, 0 insertions, 138 deletions
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
deleted file mode 100644
index 698a34a6..00000000
--- a/plugins/amqp/amqpjobs/redial.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package amqpjobs
-
-import (
- "time"
-
- "github.com/cenkalti/backoff/v4"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
-)
-
-// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (c *consumer) redialer() { //nolint:gocognit
- go func() {
- const op = errors.Op("rabbitmq_redial")
-
- for {
- select {
- case err := <-c.conn.NotifyClose(make(chan *amqp.Error)):
- if err == nil {
- return
- }
-
- c.Lock()
-
- // trash the broken publishing channel
- <-c.publishChan
-
- t := time.Now().UTC()
- pipe := c.pipeline.Load().(*pipeline.Pipeline)
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeError,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Error: err,
- Start: time.Now().UTC(),
- })
-
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = c.retryTimeout
- operation := func() error {
- c.log.Warn("rabbitmq reconnecting, caused by", "error", err)
- var dialErr error
- c.conn, dialErr = amqp.Dial(c.connStr)
- if dialErr != nil {
- return errors.E(op, dialErr)
- }
-
- c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
-
- // re-init connection
- errInit := c.initRabbitMQ()
- if errInit != nil {
- c.log.Error("rabbitmq dial", "error", errInit)
- return errInit
- }
-
- // redeclare consume channel
- var errConnCh error
- c.consumeChan, errConnCh = c.conn.Channel()
- if errConnCh != nil {
- return errors.E(op, errConnCh)
- }
-
- // redeclare publish channel
- pch, errPubCh := c.conn.Channel()
- if errPubCh != nil {
- return errors.E(op, errPubCh)
- }
-
- // start reading messages from the channel
- deliv, err := c.consumeChan.Consume(
- c.queue,
- c.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // put the fresh publishing channel
- c.publishChan <- pch
- // restart listener
- c.listener(deliv)
-
- c.log.Info("queues and subscribers redeclared successfully")
-
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- c.Unlock()
- c.log.Error("backoff failed", "error", retryErr)
- return
- }
-
- c.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Start: t,
- Elapsed: time.Since(t),
- })
-
- c.Unlock()
-
- case <-c.stopCh:
- pch := <-c.publishChan
- err := pch.Close()
- if err != nil {
- c.log.Error("publish channel close", "error", err)
- }
-
- if c.consumeChan != nil {
- err = c.consumeChan.Close()
- if err != nil {
- c.log.Error("consume channel close", "error", err)
- }
- }
-
- err = c.conn.Close()
- if err != nil {
- c.log.Error("amqp connection close", "error", err)
- }
-
- return
- }
- }
- }()
-}