summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 23:27:59 +0300
committerValery Piashchynski <[email protected]>2021-07-14 23:27:59 +0300
commit05055561600a240401f6e48e5b6c195d01050d45 (patch)
tree14a14ea57aa41eecbb565e9151633da6c2ac3f8d /plugins
parent3483f964c7e63bf2e5bd97fee872396e8f415393 (diff)
Update SQS item.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/drivers/sqs/item.go27
-rw-r--r--plugins/jobs/drivers/sqs/listener.go4
2 files changed, 17 insertions, 14 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 815b68c6..0f03cd20 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -70,9 +70,12 @@ type Options struct {
}
// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry(attempt int64) bool {
+func (o *Options) CanRetry() bool {
// Attempts 1 and 0 has identical effect
- return o.MaxAttempts > (attempt + 1)
+ if o.MaxAttempts == 0 || o.MaxAttempts == 1 {
+ return false
+ }
+ return o.MaxAttempts > (o.approxReceiveCount + 1)
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -122,7 +125,7 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- if i.Options.CanRetry(i.Options.approxReceiveCount) {
+ if i.Options.CanRetry() {
return nil
}
@@ -168,42 +171,42 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput {
}
}
-func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) {
+func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
- return nil, 0, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
+ return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
}
for i := 0; i < len(attributes); i++ {
if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
- return nil, 0, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
}
}
attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
if err != nil {
- return nil, 0, errors.E(op, err)
+ return nil, errors.E(op, err)
}
delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
if err != nil {
- return nil, 0, errors.E(op, err)
+ return nil, errors.E(op, err)
}
to, err := strconv.Atoi(*msg.MessageAttributes[job.RRTimeout].StringValue)
if err != nil {
- return nil, 0, errors.E(op, err)
+ return nil, errors.E(op, err)
}
priority, err := strconv.Atoi(*msg.MessageAttributes[job.RRPriority].StringValue)
if err != nil {
- return nil, 0, errors.E(op, err)
+ return nil, errors.E(op, err)
}
recCount, err := strconv.Atoi(msg.Attributes[ApproximateReceiveCount])
if err != nil {
- return nil, 0, errors.E(op, err)
+ return nil, errors.E(op, err)
}
item := &Item{
@@ -223,5 +226,5 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int,
},
}
- return item, recCount, nil
+ return item, nil
}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 234ed90f..ded79ae7 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -32,7 +32,7 @@ func (j *JobConsumer) listen() {
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, attempt, err := unpack(&m, j.outputQ.QueueUrl, j.client)
+ item, err := unpack(&m, j.outputQ.QueueUrl, j.client)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.outputQ.QueueUrl,
@@ -46,7 +46,7 @@ func (j *JobConsumer) listen() {
continue
}
- if item.Options.CanRetry(int64(attempt)) {
+ if item.Options.CanRetry() {
j.pq.Insert(item)
continue
}