diff options
author | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-11 19:54:35 +0300 |
commit | 0f70f1e2311640236d74a0a237536779d8d44223 (patch) | |
tree | 8b2e9dc32b5b6bafe418083c33cce3dbb8f277c7 /plugins/jobs/brokers/amqp/redial.go | |
parent | 240b114e1ea3c1414bcd9f4d2c050d56c467222f (diff) |
Update JOBS interface, Renamed Consume -> Run.
Add DYNAMIC declaration of the pipelines. Update Jobs constructor
interface, add FromPipeline method to construct jobs driver (unique)
via the `Declare` RPC call.
Add `Stop` method to gracefully stop all consumers.
Binary heaps `GetMax` to canonical `ExtractMin`.
Other small improvements.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/redial.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/redial.go | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/plugins/jobs/brokers/amqp/redial.go b/plugins/jobs/brokers/amqp/redial.go index 874e68c4..16071b78 100644 --- a/plugins/jobs/brokers/amqp/redial.go +++ b/plugins/jobs/brokers/amqp/redial.go @@ -12,29 +12,34 @@ import ( func (j *JobsConsumer) redialer() { //nolint:gocognit go func() { const op = errors.Op("rabbitmq_redial") - for err := range j.conn.NotifyClose(make(chan *amqp.Error)) { - if err != nil { - j.Lock() - j.logger.Error("connection closed, reconnecting", "error", err) + for { + select { + case err := <-j.conn.NotifyClose(make(chan *amqp.Error)): + if err == nil { + return + } + + j.Lock() + j.log.Error("connection closed, reconnecting", "error", err) expb := backoff.NewExponentialBackOff() // set the retry timeout (minutes) expb.MaxElapsedTime = j.retryTimeout op := func() error { - j.logger.Warn("rabbitmq reconnecting, caused by", "error", err) + j.log.Warn("rabbitmq reconnecting, caused by", "error", err) var dialErr error j.conn, dialErr = amqp.Dial(j.connStr) if dialErr != nil { return fmt.Errorf("fail to dial server endpoint: %v", dialErr) } - j.logger.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") + j.log.Info("rabbitmq dial succeed. trying to redeclare queues and subscribers") // re-init connection errInit := j.initRabbitMQ() if errInit != nil { - j.logger.Error("error while redialing", "error", errInit) + j.log.Error("error while redialing", "error", errInit) return errInit } @@ -69,18 +74,32 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit // restart listener j.listener(deliv) - j.logger.Info("queues and subscribers redeclare succeed") + j.log.Info("queues and subscribers redeclare succeed") return nil } retryErr := backoff.Retry(op, expb) if retryErr != nil { j.Unlock() - j.logger.Error("backoff failed", "error", retryErr) + j.log.Error("backoff failed", "error", retryErr) return } j.Unlock() + + case <-j.stopCh: + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } + err = j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } + err = j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } } } }() |