summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go2
-rw-r--r--plugins/jobs/drivers/sqs/listener.go8
2 files changed, 9 insertions, 1 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 43617716..246ff3ba 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -340,7 +340,7 @@ func (j *JobConsumer) Pause(p string) {
j.pauseCh <- struct{}{}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 8c5d887e..887f8358 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -78,11 +78,19 @@ func (j *JobConsumer) listen() { //nolint:gocognit
continue
}
+ // No retry
+ if item.Options.MaxAttempts == 0 {
+ j.pq.Insert(item)
+ continue
+ }
+
+ // MaxAttempts option specified
if item.Options.CanRetry() {
j.pq.Insert(item)
continue
}
+ // If MaxAttempts is more than 0, and can't retry -> delete the message
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,