summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go4
-rw-r--r--plugins/jobs/drivers/sqs/listener.go1
2 files changed, 3 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 18546715..43617716 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -101,7 +101,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
@@ -209,7 +209,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
sessionToken: globalCfg.SessionToken,
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
- pauseCh: make(chan struct{}),
+ pauseCh: make(chan struct{}, 1),
}
// PARSE CONFIGURATION -------
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 5722c19a..8c5d887e 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -22,6 +22,7 @@ func (j *JobConsumer) listen() { //nolint:gocognit
for {
select {
case <-j.pauseCh:
+ j.log.Warn("sqs listener stopped")
return
default:
message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{