diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 21:14:26 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 21:14:26 +0300 |
commit | b38a0fffab5bba5fa7a0e460f0e6b87547ee3eda (patch) | |
tree | 0e3ea6775d0b83210e298d256738ad15669a9e8a | |
parent | 7ea227733e0b1fa59021233e6cd0fd06442fbe50 (diff) |
NPE when stopping RMQ channels.
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/drivers/amqp/redial.go | 27 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 2 |
2 files changed, 20 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go index 0b52a4d1..4f04484e 100644 --- a/plugins/jobs/drivers/amqp/redial.go +++ b/plugins/jobs/drivers/amqp/redial.go @@ -108,18 +108,27 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit j.Unlock() case <-j.stopCh: - err := j.publishChan.Close() - if err != nil { - j.log.Error("publish channel close", "error", err) + if j.publishChan != nil { + err := j.publishChan.Close() + if err != nil { + j.log.Error("publish channel close", "error", err) + } } - err = j.consumeChan.Close() - if err != nil { - j.log.Error("consume channel close", "error", err) + + if j.consumeChan != nil { + err := j.consumeChan.Close() + if err != nil { + j.log.Error("consume channel close", "error", err) + } } - err = j.conn.Close() - if err != nil { - j.log.Error("amqp connection close", "error", err) + if j.conn != nil { + err := j.conn.Close() + if err != nil { + j.log.Error("amqp connection close", "error", err) + } } + + return } } }() diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 7e2f229c..7e1f6d56 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -101,6 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, + pauseCh: make(chan struct{}), } // PARSE CONFIGURATION ------- @@ -197,6 +198,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf sessionToken: globalCfg.SessionToken, secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, + pauseCh: make(chan struct{}), } // PARSE CONFIGURATION ------- |