summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-23 08:35:18 +0300
committerValery Piashchynski <[email protected]>2021-07-23 08:35:18 +0300
commitc851b5611a4118b714a05873225916ae07cf4e4a (patch)
tree204f078f78401de2ee88951b2665399c92d6403b
parent54a5c4f2766927427431fd9960c7936dccadeaba (diff)
SQS configuration and tests update
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/drivers/sqs/config.go17
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go11
-rw-r--r--plugins/jobs/drivers/sqs/item.go8
-rw-r--r--tests/plugins/jobs/jobs_sqs_test.go13
-rw-r--r--tests/plugins/jobs/sqs/.rr-sqs-init.yaml9
5 files changed, 33 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
index af5b1cfb..39d0af48 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -2,6 +2,15 @@ package sqs
import "github.com/aws/aws-sdk-go-v2/aws"
+const (
+ attributes string = "attributes"
+ tags string = "tags"
+ queue string = "queue"
+ pref string = "prefetch"
+ visibility string = "visibility_timeout"
+ waitTime string = "wait_time"
+)
+
type GlobalCfg struct {
Key string `mapstructure:"key"`
Secret string `mapstructure:"secret"`
@@ -20,10 +29,10 @@ type Config struct {
// sooner than WaitTimeSeconds. If no messages are available and the wait time
// expires, the call returns successfully with an empty list of messages.
WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"`
- // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages
+ // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages
// than this value (however, fewer messages might be returned). Valid values: 1 to
// 10. Default: 1.
- PrefetchCount int32 `mapstructure:"pipeline_size"`
+ Prefetch int32 `mapstructure:"prefetch"`
// The name of the new queue. The following limits apply to this name:
//
// * A queue
@@ -87,8 +96,8 @@ func (c *Config) InitDefault() {
c.Queue = aws.String("default")
}
- if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
- c.PrefetchCount = 10
+ if c.Prefetch == 0 || c.Prefetch > 10 {
+ c.Prefetch = 10
}
if c.WaitTimeSeconds == 0 {
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 1ded5bc9..f6311715 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -93,7 +93,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
attributes: pipeCfg.Attributes,
tags: pipeCfg.Tags,
queue: pipeCfg.Queue,
- prefetch: pipeCfg.PrefetchCount,
+ prefetch: pipeCfg.Prefetch,
visibilityTimeout: pipeCfg.VisibilityTimeout,
waitTime: pipeCfg.WaitTimeSeconds,
region: globalCfg.Region,
@@ -142,15 +142,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
const op = errors.Op("new_sqs_consumer")
- const (
- attributes string = "attributes"
- tags string = "tags"
- queue string = "queue"
- pref string = "prefetch"
- visibility string = "visibility_timeout"
- waitTime string = "wait_time"
- )
-
// if no global section
if !cfg.Has(pluginName) {
return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section"))
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 9b052fbc..325c4781 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -20,7 +20,7 @@ const (
ApproximateReceiveCount string = "ApproximateReceiveCount"
)
-var attributes = []string{
+var itemAttributes = []string{
job.RRJob,
job.RRDelay,
job.RRTimeout,
@@ -194,9 +194,9 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute"))
}
- for i := 0; i < len(attributes); i++ {
- if _, ok := msg.MessageAttributes[attributes[i]]; !ok {
- return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i]))
+ for i := 0; i < len(itemAttributes); i++ {
+ if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok {
+ return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i]))
}
}
diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go
index 359842dc..7cad3876 100644
--- a/tests/plugins/jobs/jobs_sqs_test.go
+++ b/tests/plugins/jobs/jobs_sqs_test.go
@@ -222,12 +222,13 @@ func declareSQSPipe(t *testing.T) {
client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{
- "driver": "sqs",
- "name": "test-3",
- "queue": "default",
- "prefetch": "100",
- "priority": "3",
- "wait_time": "3",
+ "driver": "sqs",
+ "name": "test-3",
+ "queue": "default",
+ "prefetch": "10",
+ "priority": "3",
+ "visibility_timeout": "0",
+ "wait_time_seconds": "3",
}}
er := &jobsv1beta.Empty{}
diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml
index f2702906..ca2f7652 100644
--- a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml
+++ b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml
@@ -32,9 +32,16 @@ jobs:
test-1:
driver: sqs
prefetch: 1000
+ visibility_timeout: 0
+ wait_time_seconds: 0
queue: default
+ # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html
attributes:
- MessageRetentionPeriod: 86400
+ DelaySeconds: 0
+ MaximumMessageSize: 262144
+ MessageRetentionPeriod: 345600
+ ReceiveMessageWaitTimeSeconds: 0
+ VisibilityTimeout: 30
tags:
test: "tag"