summaryrefslogtreecommitdiff
path: root/plugins/jobs/broker/sqs/job.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
committerValery Piashchynski <[email protected]>2021-06-15 22:12:32 +0300
commitd4c92e48bada7593b6fbec612a742c599de6e736 (patch)
tree53b6fb81987953b71a77ae094e579a0a7daa407c /plugins/jobs/broker/sqs/job.go
parent9dc98d43b0c0de3e1e1bd8fdc97c122c7c7c594f (diff)
- Jobs plugin initial commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/broker/sqs/job.go')
-rw-r--r--plugins/jobs/broker/sqs/job.go80
1 files changed, 80 insertions, 0 deletions
diff --git a/plugins/jobs/broker/sqs/job.go b/plugins/jobs/broker/sqs/job.go
new file mode 100644
index 00000000..50e2c164
--- /dev/null
+++ b/plugins/jobs/broker/sqs/job.go
@@ -0,0 +1,80 @@
+package sqs
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/sqs"
+ "github.com/spiral/jobs/v2"
+ "strconv"
+ "time"
+)
+
+var jobAttributes = []*string{
+ aws.String("rr-job"),
+ aws.String("rr-maxAttempts"),
+ aws.String("rr-delay"),
+ aws.String("rr-timeout"),
+ aws.String("rr-retryDelay"),
+}
+
+// pack job metadata into headers
+func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
+ return &sqs.SendMessageInput{
+ QueueUrl: url,
+ DelaySeconds: aws.Int64(int64(j.Options.Delay)),
+ MessageBody: aws.String(j.Payload),
+ MessageAttributes: map[string]*sqs.MessageAttributeValue{
+ "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
+ "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
+ "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
+ "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
+ "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
+ },
+ }
+}
+
+// unpack restores jobs.Options
+func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
+ if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
+ return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
+ }
+ attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
+
+ for _, attr := range jobAttributes {
+ if _, ok := msg.MessageAttributes[*attr]; !ok {
+ return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
+ }
+ }
+
+ j = &jobs.Job{
+ Job: *msg.MessageAttributes["rr-job"].StringValue,
+ Payload: *msg.Body,
+ Options: &jobs.Options{},
+ }
+
+ if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
+ j.Options.Delay = delay
+ }
+
+ if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
+ j.Options.Attempts = maxAttempts
+ }
+
+ if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
+ j.Options.Timeout = timeout
+ }
+
+ if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
+ j.Options.RetryDelay = retryDelay
+ }
+
+ return *msg.MessageId, attempt - 1, j, nil
+}
+
+func awsString(n int) *string {
+ return aws.String(strconv.Itoa(n))
+}
+
+func awsDuration(d time.Duration) *string {
+ return aws.String(strconv.Itoa(int(d.Seconds())))
+}