diff options
author | Valery Piashchynski <[email protected]> | 2021-07-23 08:35:18 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-23 08:35:18 +0300 |
commit | c851b5611a4118b714a05873225916ae07cf4e4a (patch) | |
tree | 204f078f78401de2ee88951b2665399c92d6403b | |
parent | 54a5c4f2766927427431fd9960c7936dccadeaba (diff) |
SQS configuration and tests update
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 11 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 8 | ||||
-rw-r--r-- | tests/plugins/jobs/jobs_sqs_test.go | 13 | ||||
-rw-r--r-- | tests/plugins/jobs/sqs/.rr-sqs-init.yaml | 9 |
5 files changed, 33 insertions, 25 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go index af5b1cfb..39d0af48 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/jobs/drivers/sqs/config.go @@ -2,6 +2,15 @@ package sqs import "github.com/aws/aws-sdk-go-v2/aws" +const ( + attributes string = "attributes" + tags string = "tags" + queue string = "queue" + pref string = "prefetch" + visibility string = "visibility_timeout" + waitTime string = "wait_time" +) + type GlobalCfg struct { Key string `mapstructure:"key"` Secret string `mapstructure:"secret"` @@ -20,10 +29,10 @@ type Config struct { // sooner than WaitTimeSeconds. If no messages are available and the wait time // expires, the call returns successfully with an empty list of messages. WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` - // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages + // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages // than this value (however, fewer messages might be returned). Valid values: 1 to // 10. Default: 1. - PrefetchCount int32 `mapstructure:"pipeline_size"` + Prefetch int32 `mapstructure:"prefetch"` // The name of the new queue. The following limits apply to this name: // // * A queue @@ -87,8 +96,8 @@ func (c *Config) InitDefault() { c.Queue = aws.String("default") } - if c.PrefetchCount == 0 || c.PrefetchCount > 10 { - c.PrefetchCount = 10 + if c.Prefetch == 0 || c.Prefetch > 10 { + c.Prefetch = 10 } if c.WaitTimeSeconds == 0 { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 1ded5bc9..f6311715 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -93,7 +93,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure attributes: pipeCfg.Attributes, tags: pipeCfg.Tags, queue: pipeCfg.Queue, - prefetch: pipeCfg.PrefetchCount, + prefetch: pipeCfg.Prefetch, visibilityTimeout: pipeCfg.VisibilityTimeout, waitTime: pipeCfg.WaitTimeSeconds, region: globalCfg.Region, @@ -142,15 +142,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_sqs_consumer") - const ( - attributes string = "attributes" - tags string = "tags" - queue string = "queue" - pref string = "prefetch" - visibility string = "visibility_timeout" - waitTime string = "wait_time" - ) - // if no global section if !cfg.Has(pluginName) { return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 9b052fbc..325c4781 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -20,7 +20,7 @@ const ( ApproximateReceiveCount string = "ApproximateReceiveCount" ) -var attributes = []string{ +var itemAttributes = []string{ job.RRJob, job.RRDelay, job.RRTimeout, @@ -194,9 +194,9 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) } - for i := 0; i < len(attributes); i++ { - if _, ok := msg.MessageAttributes[attributes[i]]; !ok { - return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i])) + for i := 0; i < len(itemAttributes); i++ { + if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok { + return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i])) } } diff --git a/tests/plugins/jobs/jobs_sqs_test.go b/tests/plugins/jobs/jobs_sqs_test.go index 359842dc..7cad3876 100644 --- a/tests/plugins/jobs/jobs_sqs_test.go +++ b/tests/plugins/jobs/jobs_sqs_test.go @@ -222,12 +222,13 @@ func declareSQSPipe(t *testing.T) { client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) pipe := &jobsv1beta.DeclareRequest{Pipeline: map[string]string{ - "driver": "sqs", - "name": "test-3", - "queue": "default", - "prefetch": "100", - "priority": "3", - "wait_time": "3", + "driver": "sqs", + "name": "test-3", + "queue": "default", + "prefetch": "10", + "priority": "3", + "visibility_timeout": "0", + "wait_time_seconds": "3", }} er := &jobsv1beta.Empty{} diff --git a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml index f2702906..ca2f7652 100644 --- a/tests/plugins/jobs/sqs/.rr-sqs-init.yaml +++ b/tests/plugins/jobs/sqs/.rr-sqs-init.yaml @@ -32,9 +32,16 @@ jobs: test-1: driver: sqs prefetch: 1000 + visibility_timeout: 0 + wait_time_seconds: 0 queue: default + # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SetQueueAttributes.html attributes: - MessageRetentionPeriod: 86400 + DelaySeconds: 0 + MaximumMessageSize: 262144 + MessageRetentionPeriod: 345600 + ReceiveMessageWaitTimeSeconds: 0 + VisibilityTimeout: 30 tags: test: "tag" |