diff options
author | Valery Piashchynski <[email protected]> | 2021-07-23 08:35:18 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-23 08:35:18 +0300 |
commit | c851b5611a4118b714a05873225916ae07cf4e4a (patch) | |
tree | 204f078f78401de2ee88951b2665399c92d6403b /plugins | |
parent | 54a5c4f2766927427431fd9960c7936dccadeaba (diff) |
SQS configuration and tests update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 11 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 8 |
3 files changed, 18 insertions, 18 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go index af5b1cfb..39d0af48 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/jobs/drivers/sqs/config.go @@ -2,6 +2,15 @@ package sqs import "github.com/aws/aws-sdk-go-v2/aws" +const ( + attributes string = "attributes" + tags string = "tags" + queue string = "queue" + pref string = "prefetch" + visibility string = "visibility_timeout" + waitTime string = "wait_time" +) + type GlobalCfg struct { Key string `mapstructure:"key"` Secret string `mapstructure:"secret"` @@ -20,10 +29,10 @@ type Config struct { // sooner than WaitTimeSeconds. If no messages are available and the wait time // expires, the call returns successfully with an empty list of messages. WaitTimeSeconds int32 `mapstructure:"wait_time_seconds"` - // PrefetchCount is the maximum number of messages to return. Amazon SQS never returns more messages + // Prefetch is the maximum number of messages to return. Amazon SQS never returns more messages // than this value (however, fewer messages might be returned). Valid values: 1 to // 10. Default: 1. - PrefetchCount int32 `mapstructure:"pipeline_size"` + Prefetch int32 `mapstructure:"prefetch"` // The name of the new queue. The following limits apply to this name: // // * A queue @@ -87,8 +96,8 @@ func (c *Config) InitDefault() { c.Queue = aws.String("default") } - if c.PrefetchCount == 0 || c.PrefetchCount > 10 { - c.PrefetchCount = 10 + if c.Prefetch == 0 || c.Prefetch > 10 { + c.Prefetch = 10 } if c.WaitTimeSeconds == 0 { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 1ded5bc9..f6311715 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -93,7 +93,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure attributes: pipeCfg.Attributes, tags: pipeCfg.Tags, queue: pipeCfg.Queue, - prefetch: pipeCfg.PrefetchCount, + prefetch: pipeCfg.Prefetch, visibilityTimeout: pipeCfg.VisibilityTimeout, waitTime: pipeCfg.WaitTimeSeconds, region: globalCfg.Region, @@ -142,15 +142,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { const op = errors.Op("new_sqs_consumer") - const ( - attributes string = "attributes" - tags string = "tags" - queue string = "queue" - pref string = "prefetch" - visibility string = "visibility_timeout" - waitTime string = "wait_time" - ) - // if no global section if !cfg.Has(pluginName) { return nil, errors.E(op, errors.Str("no global sqs configuration, global configuration should contain sqs section")) diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 9b052fbc..325c4781 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -20,7 +20,7 @@ const ( ApproximateReceiveCount string = "ApproximateReceiveCount" ) -var attributes = []string{ +var itemAttributes = []string{ job.RRJob, job.RRDelay, job.RRTimeout, @@ -194,9 +194,9 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error return nil, errors.E(op, errors.Str("failed to unpack the ApproximateReceiveCount attribute")) } - for i := 0; i < len(attributes); i++ { - if _, ok := msg.MessageAttributes[attributes[i]]; !ok { - return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", attributes[i])) + for i := 0; i < len(itemAttributes); i++ { + if _, ok := msg.MessageAttributes[itemAttributes[i]]; !ok { + return nil, errors.E(op, errors.Errorf("missing queue attribute: %s", itemAttributes[i])) } } |