diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 7e1f6d56..18546715 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -34,7 +34,7 @@ type JobConsumer struct { sessionToken string region string endpoint string - queue string + queue *string messageGroupID string waitTime int32 prefetch int32 @@ -47,8 +47,8 @@ type JobConsumer struct { attributes map[string]string tags map[string]string - client *sqs.Client - outputQ *sqs.CreateQueueOutput + client *sqs.Client + queueURL *string pauseCh chan struct{} } @@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf messageGroupID: uuid.NewString(), attributes: attr, tags: tg, - queue: pipe.String(queue, "default"), + queue: aws.String(pipe.String(queue, "default")), prefetch: int32(pipe.Int(pref, 10)), visibilityTimeout: int32(pipe.Int(visibility, 0)), waitTime: int32(pipe.Int(waitTime, 0)), @@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf }) }) - jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags}) + out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags}) if err != nil { return nil, errors.E(op, err) } + // assign a queue URL + jb.queueURL = out.QueueUrl + + // To successfully create a new queue, you must provide a + // queue name that adheres to the limits related to queues + // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html) + // and is unique within the scope of your queues. After you create a queue, you + // must wait at least one second after the queue is created to be able to use the <------------ + // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require + time.Sleep(time.Second * 2) + return jb, nil } @@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. - _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl)) + _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL)) if err != nil { return errors.E(op, err) } @@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { // start listener go j.listen() + j.eh.Push(events.JobEvent{ + Event: events.EventPipeRun, + Driver: pipe.Driver(), + Pipeline: pipe.Name(), + Start: time.Now(), + }) + return nil } |