diff options
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 27 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 4 |
2 files changed, 17 insertions, 14 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 815b68c6..0f03cd20 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -70,9 +70,12 @@ type Options struct { } // CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry(attempt int64) bool { +func (o *Options) CanRetry() bool { // Attempts 1 and 0 has identical effect - return o.MaxAttempts > (attempt + 1) + if o.MaxAttempts == 0 || o.MaxAttempts == 1 { + return false + } + return o.MaxAttempts > (o.approxReceiveCount + 1) } // DelayDuration returns delay duration in a form of time.Duration. @@ -122,7 +125,7 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - if i.Options.CanRetry(i.Options.approxReceiveCount) { + if i.Options.CanRetry() { return nil } @@ -168,42 +171,42 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput { } } -func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) { +func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { - return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) + return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) } for i := 0; i < len(attributes); i++ { if _, ok := msg.MessageAttributes[attributes[i]]; !ok { - return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i])) + return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i])) } } attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue) if err != nil { - return nil, 0, errors.E(op, err) + return nil, errors.E(op, err) } delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) if err != nil { - return nil, 0, errors.E(op, err) + return nil, errors.E(op, err) } to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue) if err != nil { - return nil, 0, errors.E(op, err) + return nil, errors.E(op, err) } priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue) if err != nil { - return nil, 0, errors.E(op, err) + return nil, errors.E(op, err) } recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount]) if err != nil { - return nil, 0, errors.E(op, err) + return nil, errors.E(op, err) } item := &Item{ @@ -223,5 +226,5 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, }, } - return item, recCount, nil + return item, nil } diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 234ed90f..ded79ae7 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -32,7 +32,7 @@ func (j *JobConsumer) listen() { for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, attempt, err := unpack(&m, j.outputQ.QueueUrl, j.client) + item, err := unpack(&m, j.outputQ.QueueUrl, j.client) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.outputQ.QueueUrl, @@ -46,7 +46,7 @@ func (j *JobConsumer) listen() { continue } - if item.Options.CanRetry(int64(attempt)) { + if item.Options.CanRetry() { j.pq.Insert(item) continue } |