summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 21:14:26 +0300
committerValery Piashchynski <[email protected]>2021-07-14 21:14:26 +0300
commitb38a0fffab5bba5fa7a0e460f0e6b87547ee3eda (patch)
tree0e3ea6775d0b83210e298d256738ad15669a9e8a /plugins
parent7ea227733e0b1fa59021233e6cd0fd06442fbe50 (diff)
NPE when stopping RMQ channels.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/amqp/redial.go27
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go2
2 files changed, 20 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/redial.go b/plugins/jobs/drivers/amqp/redial.go
index 0b52a4d1..4f04484e 100644
--- a/plugins/jobs/drivers/amqp/redial.go
+++ b/plugins/jobs/drivers/amqp/redial.go
@@ -108,18 +108,27 @@ func (j *JobsConsumer) redialer() { //nolint:gocognit
j.Unlock()
case <-j.stopCh:
- err := j.publishChan.Close()
- if err != nil {
- j.log.Error("publish channel close", "error", err)
+ if j.publishChan != nil {
+ err := j.publishChan.Close()
+ if err != nil {
+ j.log.Error("publish channel close", "error", err)
+ }
}
- err = j.consumeChan.Close()
- if err != nil {
- j.log.Error("consume channel close", "error", err)
+
+ if j.consumeChan != nil {
+ err := j.consumeChan.Close()
+ if err != nil {
+ j.log.Error("consume channel close", "error", err)
+ }
}
- err = j.conn.Close()
- if err != nil {
- j.log.Error("amqp connection close", "error", err)
+ if j.conn != nil {
+ err := j.conn.Close()
+ if err != nil {
+ j.log.Error("amqp connection close", "error", err)
+ }
}
+
+ return
}
}
}()
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 7e2f229c..7e1f6d56 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -101,6 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
+ pauseCh: make(chan struct{}),
}
// PARSE CONFIGURATION -------
@@ -197,6 +198,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
+ pauseCh: make(chan struct{}),
}
// PARSE CONFIGURATION -------