summaryrefslogtreecommitdiff
path: root/plugins/jobs/oooold/broker/sqs/job.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/oooold/broker/sqs/job.go')
-rw-r--r--plugins/jobs/oooold/broker/sqs/job.go80
1 files changed, 0 insertions, 80 deletions
diff --git a/plugins/jobs/oooold/broker/sqs/job.go b/plugins/jobs/oooold/broker/sqs/job.go
deleted file mode 100644
index 50e2c164..00000000
--- a/plugins/jobs/oooold/broker/sqs/job.go
+++ /dev/null
@@ -1,80 +0,0 @@
-package sqs
-
-import (
- "fmt"
- "github.com/aws/aws-sdk-go/aws"
- "github.com/aws/aws-sdk-go/service/sqs"
- "github.com/spiral/jobs/v2"
- "strconv"
- "time"
-)
-
-var jobAttributes = []*string{
- aws.String("rr-job"),
- aws.String("rr-maxAttempts"),
- aws.String("rr-delay"),
- aws.String("rr-timeout"),
- aws.String("rr-retryDelay"),
-}
-
-// pack job metadata into headers
-func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
- return &sqs.SendMessageInput{
- QueueUrl: url,
- DelaySeconds: aws.Int64(int64(j.Options.Delay)),
- MessageBody: aws.String(j.Payload),
- MessageAttributes: map[string]*sqs.MessageAttributeValue{
- "rr-job": {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
- "rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
- "rr-delay": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
- "rr-timeout": {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
- "rr-retryDelay": {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
- },
- }
-}
-
-// unpack restores jobs.Options
-func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
- if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
- return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
- }
- attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])
-
- for _, attr := range jobAttributes {
- if _, ok := msg.MessageAttributes[*attr]; !ok {
- return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
- }
- }
-
- j = &jobs.Job{
- Job: *msg.MessageAttributes["rr-job"].StringValue,
- Payload: *msg.Body,
- Options: &jobs.Options{},
- }
-
- if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
- j.Options.Delay = delay
- }
-
- if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
- j.Options.Attempts = maxAttempts
- }
-
- if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
- j.Options.Timeout = timeout
- }
-
- if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
- j.Options.RetryDelay = retryDelay
- }
-
- return *msg.MessageId, attempt - 1, j, nil
-}
-
-func awsString(n int) *string {
- return aws.String(strconv.Itoa(n))
-}
-
-func awsDuration(d time.Duration) *string {
- return aws.String(strconv.Itoa(int(d.Seconds())))
-}