summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/item.go')
-rw-r--r--plugins/jobs/drivers/sqs/item.go73
1 files changed, 54 insertions, 19 deletions
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index ef736be9..815b68c6 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -1,6 +1,7 @@
package sqs
import (
+ "context"
"strconv"
"time"
@@ -60,6 +61,12 @@ type Options struct {
// Maximum number of attempts to receive and process the message
MaxAttempts int64 `json:"max_attempts,omitempty"`
+
+ // Private ================
+ approxReceiveCount int64
+ queue *string
+ receiptHandler *string
+ client *sqs.Client
}
// CanRetry must return true if broker is allowed to re-run the job.
@@ -82,30 +89,52 @@ func (o *Options) TimeoutDuration() time.Duration {
return time.Second * time.Duration(o.Timeout)
}
-func (j *Item) ID() string {
- return j.Ident
+func (i *Item) ID() string {
+ return i.Ident
}
-func (j *Item) Priority() int64 {
- return j.Options.Priority
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
}
// Body packs job payload into binary payload.
-func (j *Item) Body() []byte {
- return utils.AsBytes(j.Payload)
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
}
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
-func (j *Item) Context() ([]byte, error) {
+func (i *Item) Context() ([]byte, error) {
return nil, nil
}
-func (j *Item) Ack() error {
+func (i *Item) Ack() error {
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
return nil
}
-func (j *Item) Nack() error {
+func (i *Item) Nack() error {
+ if i.Options.CanRetry(i.Options.approxReceiveCount) {
+ return nil
+ }
+
+ _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
+ QueueUrl: i.Options.queue,
+ ReceiptHandle: i.Options.receiptHandler,
+ })
+
+ if err != nil {
+ return err
+ }
+
return nil
}
@@ -124,22 +153,22 @@ func fromJob(job *job.Job) *Item {
}
}
-func (j *JobConsumer) pack(item *Item) *sqs.SendMessageInput {
+func (i *Item) pack(queue *string) *sqs.SendMessageInput {
return &sqs.SendMessageInput{
- MessageBody: aws.String(item.Payload),
- QueueUrl: j.outputQ.QueueUrl,
- DelaySeconds: int32(item.Options.Delay),
+ MessageBody: aws.String(i.Payload),
+ QueueUrl: queue,
+ DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(item.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Timeout)))},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.Priority)))},
- job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(item.Options.MaxAttempts)))},
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
+ job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
},
}
}
-func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
+func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, int, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -185,6 +214,12 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, int, error) {
Timeout: int64(to),
Priority: int64(priority),
MaxAttempts: int64(attempt),
+
+ // private
+ approxReceiveCount: int64(recCount),
+ client: client,
+ queue: queue,
+ receiptHandler: msg.ReceiptHandle,
},
}