diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 65 |
1 files changed, 25 insertions, 40 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a3039d1b..ea4ac8b7 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -60,23 +60,12 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` - // Maximum number of attempts to receive and process the message - MaxAttempts int64 `json:"max_attempts,omitempty"` - // Private ================ approxReceiveCount int64 queue *string receiptHandler *string client *sqs.Client -} - -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry() bool { - // Attempts 1 and 0 has identical effect - if o.MaxAttempts == 0 || o.MaxAttempts == 1 { - return false - } - return o.MaxAttempts > (o.approxReceiveCount + 1) + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -140,10 +129,6 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - if i.Options.CanRetry() { - return nil - } - _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: i.Options.queue, ReceiptHandle: i.Options.receiptHandler, @@ -156,8 +141,15 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } func fromJob(job *job.Job) *Item { @@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item { Payload: job.Payload, Headers: job.Headers, Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - MaxAttempts: job.Options.Attempts, + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, }, } } @@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput { QueueUrl: queue, DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, - job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))}, + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, }, } } -func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) { +func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error } } - attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) if err != nil { return nil, errors.E(op, err) @@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error Job: *msg.MessageAttributes[job.RRJob].StringValue, Payload: *msg.Body, Options: &Options{ - Delay: int64(delay), - Timeout: int64(to), - Priority: int64(priority), - MaxAttempts: int64(attempt), + Delay: int64(delay), + Timeout: int64(to), + Priority: int64(priority), // private approxReceiveCount: int64(recCount), - client: client, - queue: queue, + client: j.client, + queue: j.queue, receiptHandler: msg.ReceiptHandle, + requeueCh: j.requeueCh, }, } |