diff options
Diffstat (limited to 'plugins/amqp/amqpjobs/redial.go')
-rw-r--r-- | plugins/amqp/amqpjobs/redial.go | 68 |
1 files changed, 34 insertions, 34 deletions
diff --git a/plugins/amqp/amqpjobs/redial.go b/plugins/amqp/amqpjobs/redial.go index 0835e3ea..56142e2b 100644 --- a/plugins/amqp/amqpjobs/redial.go +++ b/plugins/amqp/amqpjobs/redial.go @@ -11,26 +11,26 @@ import ( ) // redialer used to redial to the rabbitmq in case of the connection interrupts -func (j *consumer) redialer() { //nolint:gocognit +func (c *consumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") for { select { - case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + case err := <-c.conn.NotifyClose(make(chan *amqp.Error)): if err == nil { return } - j.Lock() + c.Lock() // trash the broken publishing channel - <-j.publishChan + <-c.publishChan t := time.Now() - pipe := j.pipeline.Load().(*pipeline.Pipeline) + pipe := c.pipeline.Load().(*pipeline.Pipeline) - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeError, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -40,41 +40,41 @@ func (j *consumer) redialer() { //nolint:gocognit expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) - expb.MaxElapsedTime = j.retryTimeout + expb.MaxElapsedTime = c.retryTimeout operation := func() error { - j.log.Warn("rabbitmq reconnecting, caused by", "error", err) + c.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error - j.conn, dialErr = amqp.Dial(j.connStr) + c.conn, dialErr = amqp.Dial(c.connStr) if dialErr != nil { return errors.E(op, dialErr) } - j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + c.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection - errInit := j.initRabbitMQ() + errInit := c.initRabbitMQ() if errInit != nil { - j.log.Error("rabbitmq dial", "error", errInit) + c.log.Error("rabbitmq dial", "error", errInit) return errInit } // redeclare consume channel var errConnCh error - j.consumeChan, errConnCh = j.conn.Channel() + c.consumeChan, errConnCh = c.conn.Channel() if errConnCh != nil { return errors.E(op, errConnCh) } // redeclare publish channel - pch, errPubCh := j.conn.Channel() + pch, errPubCh := c.conn.Channel() if errPubCh != nil { return errors.E(op, errPubCh) } // start reading messages from the channel - deliv, err := j.consumeChan.Consume( - j.queue, - j.consumeID, + deliv, err := c.consumeChan.Consume( + c.queue, + c.consumeID, false, false, false, @@ -86,23 +86,23 @@ func (j *consumer) redialer() { //nolint:gocognit } // put the fresh publishing channel - j.publishChan <- pch + c.publishChan <- pch // restart listener - j.listener(deliv) + c.listener(deliv) - j.log.Info("queues and subscribers redeclared successfully") + c.log.Info("queues and subscribers redeclared successfully") return nil } retryErr := backoff.Retry(operation, expb) if retryErr != nil { - j.Unlock() - j.log.Error("backoff failed", "error", retryErr) + c.Unlock() + c.log.Error("backoff failed", "error", retryErr) return } - j.eh.Push(events.JobEvent{ + c.eh.Push(events.JobEvent{ Event: events.EventPipeActive, Pipeline: pipe.Name(), Driver: pipe.Driver(), @@ -110,27 +110,27 @@ func (j *consumer) redialer() { //nolint:gocognit Elapsed: time.Since(t), }) - j.Unlock() + c.Unlock() - case <-j.stopCh: - if j.publishChan != nil { - pch := <-j.publishChan + case <-c.stopCh: + if c.publishChan != nil { + pch := <-c.publishChan err := pch.Close() if err != nil { - j.log.Error("publish channel close", "error", err) + c.log.Error("publish channel close", "error", err) } } - if j.consumeChan != nil { - err := j.consumeChan.Close() + if c.consumeChan != nil { + err := c.consumeChan.Close() if err != nil { - j.log.Error("consume channel close", "error", err) + c.log.Error("consume channel close", "error", err) } } - if j.conn != nil { - err := j.conn.Close() + if c.conn != nil { + err := c.conn.Close() if err != nil { - j.log.Error("amqp connection close", "error", err) + c.log.Error("amqp connection close", "error", err) } } |