summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/redial.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/redial.go')
-rw-r--r--plugins/jobs/drivers/amqp/redial.go11
1 files changed, 8 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index d61c75b2..fd19f1ce 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -24,6 +24,9 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
j.Lock()
+ // trash the broken publish channel
+ <-j.publishChan
+
t := time.Now()
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -63,8 +66,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
}
// redeclare publish channel
- var errPubCh error
- j.publishChan, errPubCh = j.conn.Channel()
+ pch, errPubCh := j.conn.Channel()
if errPubCh != nil {
return errors.E(op, errPubCh)
}
@@ -83,10 +85,12 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
return errors.E(op, err)
}
+ j.publishChan <- pch
// restart listener
j.listener(deliv)
j.log.Info("queues and subscribers redeclared successfully")
+
return nil
}
@@ -109,7 +113,8 @@ func (j *JobConsumer) redialer() { //nolint:gocognit
case <-j.stopCh:
if j.publishChan != nil {
- err := j.publishChan.Close()
+ pch := <-j.publishChan
+ err := pch.Close()
if err != nil {
j.log.Error("publish channel close", "error", err)
}