summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r--plugins/jobs/drivers/sqs/item.go65
1 files changed, 25 insertions, 40 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a3039d1b..ea4ac8b7 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -60,23 +60,12 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
- // Maximum number of attempts to receive and process the message
- MaxAttempts int64 `json:"max_attempts,omitempty"`
-
// Private ================
approxReceiveCount int64
queue *string
receiptHandler *string
client *sqs.Client
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry() bool {
- // Attempts 1 and 0 has identical effect
- if o.MaxAttempts == 0 || o.MaxAttempts == 1 {
- return false
- }
- return o.MaxAttempts > (o.approxReceiveCount + 1)
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -140,10 +129,6 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- if i.Options.CanRetry() {
- return nil
- }
-
_, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
@@ -156,8 +141,15 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
func fromJob(job *job.Job) *Item {
@@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item {
Payload: job.Payload,
Headers: job.Headers,
Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- MaxAttempts: job.Options.Attempts,
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
},
}
}
@@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput {
QueueUrl: queue,
DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
- job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
},
}
}
-func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) {
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
}
}
- attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
if err != nil {
return nil, errors.E(op, err)
@@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
Job: *msg.MessageAttributes[job.RRJob].StringValue,
Payload: *msg.Body,
Options: &Options{
- Delay: int64(delay),
- Timeout: int64(to),
- Priority: int64(priority),
- MaxAttempts: int64(attempt),
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
// private
approxReceiveCount: int64(recCount),
- client: client,
- queue: queue,
+ client: j.client,
+ queue: j.queue,
receiptHandler: msg.ReceiptHandle,
+ requeueCh: j.requeueCh,
},
}