diff options
author | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
commit | b2da831f47284974551710d2767a7bdde0efa51d (patch) | |
tree | 7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/sqs | |
parent | 50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff) |
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation.
Complete redia for the beanstalk.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/config.go | 8 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 43 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 17 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 41 |
4 files changed, 93 insertions, 16 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go index 0b4e8157..af5b1cfb 100644 --- a/plugins/jobs/drivers/sqs/config.go +++ b/plugins/jobs/drivers/sqs/config.go @@ -1,5 +1,7 @@ package sqs +import "github.com/aws/aws-sdk-go-v2/aws" + type GlobalCfg struct { Key string `mapstructure:"key"` Secret string `mapstructure:"secret"` @@ -36,7 +38,7 @@ type Config struct { // Queue URLs and names are case-sensitive. // // This member is required. - Queue string `mapstructure:"queue"` + Queue *string `mapstructure:"queue"` // A map of attributes with their corresponding values. The following lists the // names, descriptions, and values of the special request parameters that the @@ -81,8 +83,8 @@ func (c *GlobalCfg) InitDefault() { } func (c *Config) InitDefault() { - if c.Queue == "" { - c.Queue = "default" + if c.Queue == nil { + c.Queue = aws.String("default") } if c.PrefetchCount == 0 || c.PrefetchCount > 10 { diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 7e1f6d56..18546715 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -34,7 +34,7 @@ type JobConsumer struct { sessionToken string region string endpoint string - queue string + queue *string messageGroupID string waitTime int32 prefetch int32 @@ -47,8 +47,8 @@ type JobConsumer struct { attributes map[string]string tags map[string]string - client *sqs.Client - outputQ *sqs.CreateQueueOutput + client *sqs.Client + queueURL *string pauseCh chan struct{} } @@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf messageGroupID: uuid.NewString(), attributes: attr, tags: tg, - queue: pipe.String(queue, "default"), + queue: aws.String(pipe.String(queue, "default")), prefetch: int32(pipe.Int(pref, 10)), visibilityTimeout: int32(pipe.Int(visibility, 0)), waitTime: int32(pipe.Int(waitTime, 0)), @@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl)) + _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL)) if err != nil { return errors.E(op, err) } @@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { // start listener go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index 0f03cd20..9dd0aa5f 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -8,6 +8,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + json "github.com/json-iterator/go" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/plugins/jobs/job" "github.com/spiral/roadrunner/v2/utils" @@ -108,7 +109,21 @@ func (i *Item) Body() []byte { // Context packs job context (job, id) into binary payload. // Not used in the sqs, MessageAttributes used instead func (i *Item) Context() ([]byte, error) { - return nil, nil + ctx, err := json.Marshal( + struct { + ID string `json:"id"` + Job string `json:"job"` + Headers map[string][]string `json:"headers"` + Timeout int64 `json:"timeout"` + Pipeline string `json:"pipeline"` + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, + ) + + if err != nil { + return nil, err + } + + return ctx, nil } func (i *Item) Ack() error { diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index ded79ae7..5722c19a 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -2,40 +2,71 @@ package sqs import ( "context" + "time" + "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/smithy-go" ) const ( + // All - get all message attribute names All string = "All" + + // NonExistentQueue AWS error code + NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue" ) -func (j *JobConsumer) listen() { +func (j *JobConsumer) listen() { //nolint:gocognit for { select { case <-j.pauseCh: return default: message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, MaxNumberOfMessages: j.prefetch, AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)}, MessageAttributeNames: []string{All}, VisibilityTimeout: j.visibilityTimeout, WaitTimeSeconds: j.waitTime, }) + if err != nil { + if oErr, ok := (err).(*smithy.OperationError); ok { + if rErr, ok := oErr.Err.(*http.ResponseError); ok { + if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok { + // in case of NonExistentQueue - recreate the queue + if apiErr.Code == NonExistentQueue { + j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault()) + _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags}) + if err != nil { + j.log.Error("create queue", "error", err) + } + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + continue + } + } + } + } + j.log.Error("receive message", "error", err) continue } for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := unpack(&m, j.outputQ.QueueUrl, j.client) + item, err := unpack(&m, j.queueURL, j.client) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { @@ -52,7 +83,7 @@ func (j *JobConsumer) listen() { } _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.outputQ.QueueUrl, + QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, }) if errD != nil { |