summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/redial.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/redial.go')
-rw-r--r--plugins/amqp/amqpjobs/redial.go68
1 files changed, 34 insertions, 34 deletions
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go
index 0835e3ea..56142e2b 100644
--- a/plugins/amqp/amqpjobs/redial.go
+++ b/plugins/amqp/amqpjobs/redial.go
@@ -11,26 +11,26 @@ import (
)
// redialer used to redial to the rabbitmq in case of the connection interrupts
-func (j *consumer) redialer() { //nolint:gocognit
+func (c *consumer) redialer() { //nolint:gocognit
go func() {
const op = errors.Op("rabbitmq_redial")
for {
select {
- case err := <-j.conn.NotifyClose(make(chan *amqp.Error)):
+ case err := <-c.conn.NotifyClose(make(chan *amqp.Error)):
if err == nil {
return
}
- j.Lock()
+ c.Lock()
// trash the broken publishing channel
- <-j.publishChan
+ <-c.publishChan
t := time.Now()
- pipe := j.pipeline.Load().(*pipeline.Pipeline)
+ pipe := c.pipeline.Load().(*pipeline.Pipeline)
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeError,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
@@ -40,41 +40,41 @@ func (j *consumer) redialer() { //nolint:gocognit
expb := backoff.NewExponentialBackOff()
// set the retry timeout (minutes)
- expb.MaxElapsedTime = j.retryTimeout
+ expb.MaxElapsedTime = c.retryTimeout
operation := func() error {
- j.log.Warn("rabbitmq reconnecting, caused by", "error", err)
+ c.log.Warn("rabbitmq reconnecting, caused by", "error", err)
var dialErr error
- j.conn, dialErr = amqp.Dial(j.connStr)
+ c.conn, dialErr = amqp.Dial(c.connStr)
if dialErr != nil {
return errors.E(op, dialErr)
}
- j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
+ c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers")
// re-init connection
- errInit := j.initRabbitMQ()
+ errInit := c.initRabbitMQ()
if errInit != nil {
- j.log.Error("rabbitmq dial", "error", errInit)
+ c.log.Error("rabbitmq dial", "error", errInit)
return errInit
}
// redeclare consume channel
var errConnCh error
- j.consumeChan, errConnCh = j.conn.Channel()
+ c.consumeChan, errConnCh = c.conn.Channel()
if errConnCh != nil {
return errors.E(op, errConnCh)
}
// redeclare publish channel
- pch, errPubCh := j.conn.Channel()
+ pch, errPubCh := c.conn.Channel()
if errPubCh != nil {
return errors.E(op, errPubCh)
}
// start reading messages from the channel
- deliv, err := j.consumeChan.Consume(
- j.queue,
- j.consumeID,
+ deliv, err := c.consumeChan.Consume(
+ c.queue,
+ c.consumeID,
false,
false,
false,
@@ -86,23 +86,23 @@ func (j *consumer) redialer() { //nolint:gocognit
}
// put the fresh publishing channel
- j.publishChan <- pch
+ c.publishChan <- pch
// restart listener
- j.listener(deliv)
+ c.listener(deliv)
- j.log.Info("queues and subscribers redeclared successfully")
+ c.log.Info("queues and subscribers redeclared successfully")
return nil
}
retryErr := backoff.Retry(operation, expb)
if retryErr != nil {
- j.Unlock()
- j.log.Error("backoff failed", "error", retryErr)
+ c.Unlock()
+ c.log.Error("backoff failed", "error", retryErr)
return
}
- j.eh.Push(events.JobEvent{
+ c.eh.Push(events.JobEvent{
Event: events.EventPipeActive,
Pipeline: pipe.Name(),
Driver: pipe.Driver(),
@@ -110,27 +110,27 @@ func (j *consumer) redialer() { //nolint:gocognit
Elapsed: time.Since(t),
})
- j.Unlock()
+ c.Unlock()
- case <-j.stopCh:
- if j.publishChan != nil {
- pch := <-j.publishChan
+ case <-c.stopCh:
+ if c.publishChan != nil {
+ pch := <-c.publishChan
err := pch.Close()
if err != nil {
- j.log.Error("publish channel close", "error", err)
+ c.log.Error("publish channel close", "error", err)
}
}
- if j.consumeChan != nil {
- err := j.consumeChan.Close()
+ if c.consumeChan != nil {
+ err := c.consumeChan.Close()
if err != nil {
- j.log.Error("consume channel close", "error", err)
+ c.log.Error("consume channel close", "error", err)
}
}
- if j.conn != nil {
- err := j.conn.Close()
+ if c.conn != nil {
+ err := c.conn.Close()
if err != nil {
- j.log.Error("amqp connection close", "error", err)
+ c.log.Error("amqp connection close", "error", err)
}
}