diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/redial.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index d61c75b2..fd19f1ce 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -24,6 +24,9 @@ func (j *JobConsumer) redialer() { //nolint:gocognit j.Lock() + // trash the broken publish channel + <-j.publishChan + t := time.Now() pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -63,8 +66,7 @@ func (j *JobConsumer) redialer() { //nolint:gocognit } // redeclare publish channel - var errPubCh error - j.publishChan, errPubCh = j.conn.Channel() + pch, errPubCh := j.conn.Channel() if errPubCh != nil { return errors.E(op, errPubCh) } @@ -83,10 +85,12 @@ func (j *JobConsumer) redialer() { //nolint:gocognit return errors.E(op, err) } + j.publishChan <- pch // restart listener j.listener(deliv) j.log.Info("queues and subscribers redeclared successfully") + return nil } @@ -109,7 +113,8 @@ func (j *JobConsumer) redialer() { //nolint:gocognit case <-j.stopCh: if j.publishChan != nil { - err := j.publishChan.Close() + pch := <-j.publishChan + err := pch.Close() if err != nil { j.log.Error("publish channel close", "error", err) } |