summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
1 files changed, 36 insertions, 7 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 7e1f6d56..18546715 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -34,7 +34,7 @@ type JobConsumer struct {
sessionToken string
region string
endpoint string
- queue string
+ queue *string
messageGroupID string
waitTime int32
prefetch int32
@@ -47,8 +47,8 @@ type JobConsumer struct {
attributes map[string]string
tags map[string]string
- client *sqs.Client
- outputQ *sqs.CreateQueueOutput
+ client *sqs.Client
+ queueURL *string
pauseCh chan struct{}
}
@@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
messageGroupID: uuid.NewString(),
attributes: attr,
tags: tg,
- queue: pipe.String(queue, "default"),
+ queue: aws.String(pipe.String(queue, "default")),
prefetch: int32(pipe.Int(pref, 10)),
visibilityTimeout: int32(pipe.Int(visibility, 0)),
waitTime: int32(pipe.Int(waitTime, 0)),
@@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL))
if err != nil {
return errors.E(op, err)
}
@@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
// start listener
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}