diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index ef736be9..815b68c6 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -1,6 +1,7 @@ package sqs import ( + "context" "strconv" "time" @@ -60,6 +61,12 @@ type Options struct { // Maximum number of attempts to receive and process the message MaxAttempts int64 `json:"max_attempts,omitempty"` + + // Private ================ + approxReceiveCount int64 + queue *string + receiptHandler *string + client *sqs.Client } // CanRetry must return true if broker is allowed to re-run the job. @@ -82,30 +89,52 @@ func (o *Options) TimeoutDuration() time.Duration { return time.Second * time.Duration(o.Timeout) } -func (j *Item) ID() string { - return j.Ident +func (i *Item) ID() string { + return i.Ident } -func (j *Item) Priority() int64 { - return j.Options.Priority +func (i *Item) Priority() int64 { + return i.Options.Priority } // Body packs job payload into binary payload. -func (j *Item) Body() []byte { - return utils.AsBytes(j.Payload) +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) } // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead -func (j *Item) Context() ([]byte, error) { +func (i *Item) Context() ([]byte, error) { return nil, nil } -func (j *Item) Ack() error { +func (i *Item) Ack() error { + _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + return nil } -func (j *Item) Nack() error { +func (i *Item) Nack() error { + if i.Options.CanRetry(i.Options.approxReceiveCount) { + return nil + } + + _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + return nil } @@ -124,22 +153,22 @@ func fromJob(job *job.Job) *Item { } } -func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput { +func (i *Item) pack(queue *string) *sqs.SendMessageInput { return &sqs.SendMessageInput{ - MessageBody: aws.String(item.Payload), - QueueUrl: j.outputQ.QueueUrl, - DelaySeconds: int32(item.Options.Delay), + MessageBody: aws.String(i.Payload), + QueueUrl: queue, + DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))}, - job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))}, + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, + job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))}, }, } } -func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) { +func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -185,6 +214,12 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) { Timeout: int64(to), Priority: int64(priority), MaxAttempts: int64(attempt), + + // private + approxReceiveCount: int64(recCount), + client: client, + queue: queue, + receiptHandler: msg.ReceiptHandle, }, } |