summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/redial.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
committerValery Piashchynski <[email protected]>2021-08-25 18:03:30 +0300
commit3212a5b59b6dcd8aa6edac137e945d42f6f9e0ce (patch)
tree8a8426eb09b2a03cfad35f432c6985c3e13fb853 /plugins/jobs/drivers/amqp/redial.go
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
BoltDB local queue initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/redial.go')
-rw-r--r--plugins/jobs/drivers/amqp/redial.go141
1 files changed, 0 insertions, 141 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
deleted file mode 100644
index 8dc18b8f..00000000
--- a/plugins/jobs/drivers/amqp/redial.go
+++ /dev/null
@@ -1,141 +0,0 @@
-package amqp
-
-import (
- "time"
-
- "github.com/cenkalti/backoff/v4"
- amqp "github.com/rabbitmq/amqp091-go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/events"
- "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
-)
-
-// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *JobConsumer) redialer() { //nolint:gocognit
- go func() {
- const op = errors.Op("rabbitmq_redial")
-
- for {
- select {
- case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
- if err == nil {
- return
- }
-
- j.Lock()
-
- // trash the broken publishing channel
- <-j.publishChan
-
- t := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeError,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Error: err,
- Start: time.Now(),
- })
-
- expb := backoff.NewExponentialBackOff()
- // set the retry timeout (minutes)
- expb.MaxElapsedTime = j.retryTimeout
- operation := func() error {
- j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
- var dialErr error
- j.conn, dialErr = amqp.Dial(j.connStr)
- if dialErr != nil {
- return errors.E(op, dialErr)
- }
-
- j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
-
- // re-init connection
- errInit := j.initRabbitMQ()
- if errInit != nil {
- j.log.Error("rabbitmq dial", "error", errInit)
- return errInit
- }
-
- // redeclare consume channel
- var errConnCh error
- j.consumeChan, errConnCh = j.conn.Channel()
- if errConnCh != nil {
- return errors.E(op, errConnCh)
- }
-
- // redeclare publish channel
- pch, errPubCh := j.conn.Channel()
- if errPubCh != nil {
- return errors.E(op, errPubCh)
- }
-
- // start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return errors.E(op, err)
- }
-
- // put the fresh publishing channel
- j.publishChan <- pch
- // restart listener
- j.listener(deliv)
-
- j.log.Info("queues and subscribers redeclared successfully")
-
- return nil
- }
-
- retryErr := backoff.Retry(operation, expb)
- if retryErr != nil {
- j.Unlock()
- j.log.Error("backoff failed", "error", retryErr)
- return
- }
-
- j.eh.Push(events.JobEvent{
- Event: events.EventPipeActive,
- Pipeline: pipe.Name(),
- Driver: pipe.Driver(),
- Start: t,
- Elapsed: time.Since(t),
- })
-
- j.Unlock()
-
- case <-j.stopCh:
- if j.publishChan != nil {
- pch := <-j.publishChan
- err := pch.Close()
- if err != nil {
- j.log.Error("publish channel close", "error", err)
- }
- }
-
- if j.consumeChan != nil {
- err := j.consumeChan.Close()
- if err != nil {
- j.log.Error("consume channel close", "error", err)
- }
- }
- if j.conn != nil {
- err := j.conn.Close()
- if err != nil {
- j.log.Error("amqp connection close", "error", err)
- }
- }
-
- return
- }
- }
- }()
-}