summaryrefslogtreecommitdiff
path: root/plugins/jobs/broker/sqs/job.go
blob: 50e2c164f15cdfc33a259907472ff0d6ddfac26a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package sqs

import (
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/spiral/jobs/v2"
	"strconv"
	"time"
)

var jobAttributes = []*string{
	aws.String("rr-job"),
	aws.String("rr-maxAttempts"),
	aws.String("rr-delay"),
	aws.String("rr-timeout"),
	aws.String("rr-retryDelay"),
}

// pack job metadata into headers
func pack(url *string, j *jobs.Job) *sqs.SendMessageInput {
	return &sqs.SendMessageInput{
		QueueUrl:     url,
		DelaySeconds: aws.Int64(int64(j.Options.Delay)),
		MessageBody:  aws.String(j.Payload),
		MessageAttributes: map[string]*sqs.MessageAttributeValue{
			"rr-job":         {DataType: aws.String("String"), StringValue: aws.String(j.Job)},
			"rr-maxAttempts": {DataType: aws.String("String"), StringValue: awsString(j.Options.Attempts)},
			"rr-delay":       {DataType: aws.String("String"), StringValue: awsDuration(j.Options.DelayDuration())},
			"rr-timeout":     {DataType: aws.String("String"), StringValue: awsDuration(j.Options.TimeoutDuration())},
			"rr-retryDelay":  {DataType: aws.String("Number"), StringValue: awsDuration(j.Options.RetryDuration())},
		},
	}
}

// unpack restores jobs.Options
func unpack(msg *sqs.Message) (id string, attempt int, j *jobs.Job, err error) {
	if _, ok := msg.Attributes["ApproximateReceiveCount"]; !ok {
		return "", 0, nil, fmt.Errorf("missing attribute `%s`", "ApproximateReceiveCount")
	}
	attempt, _ = strconv.Atoi(*msg.Attributes["ApproximateReceiveCount"])

	for _, attr := range jobAttributes {
		if _, ok := msg.MessageAttributes[*attr]; !ok {
			return "", 0, nil, fmt.Errorf("missing message attribute `%s` (mixed queue?)", *attr)
		}
	}

	j = &jobs.Job{
		Job:     *msg.MessageAttributes["rr-job"].StringValue,
		Payload: *msg.Body,
		Options: &jobs.Options{},
	}

	if delay, err := strconv.Atoi(*msg.MessageAttributes["rr-delay"].StringValue); err == nil {
		j.Options.Delay = delay
	}

	if maxAttempts, err := strconv.Atoi(*msg.MessageAttributes["rr-maxAttempts"].StringValue); err == nil {
		j.Options.Attempts = maxAttempts
	}

	if timeout, err := strconv.Atoi(*msg.MessageAttributes["rr-timeout"].StringValue); err == nil {
		j.Options.Timeout = timeout
	}

	if retryDelay, err := strconv.Atoi(*msg.MessageAttributes["rr-retryDelay"].StringValue); err == nil {
		j.Options.RetryDelay = retryDelay
	}

	return *msg.MessageId, attempt - 1, j, nil
}

func awsString(n int) *string {
	return aws.String(strconv.Itoa(n))
}

func awsDuration(d time.Duration) *string {
	return aws.String(strconv.Itoa(int(d.Seconds())))
}