summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
committerValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
commitb2da831f47284974551710d2767a7bdde0efa51d (patch)
tree7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/drivers/sqs
parent50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff)
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation. Complete redia for the beanstalk. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/config.go8
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go43
-rw-r--r--plugins/jobs/drivers/sqs/item.go17
-rw-r--r--plugins/jobs/drivers/sqs/listener.go41
4 files changed, 93 insertions, 16 deletions
diff --git a/plugins/jobs/drivers/sqs/config.go b/plugins/jobs/drivers/sqs/config.go
index 0b4e8157..af5b1cfb 100644
--- a/plugins/jobs/drivers/sqs/config.go
+++ b/plugins/jobs/drivers/sqs/config.go
@@ -1,5 +1,7 @@
package sqs
+import "github.com/aws/aws-sdk-go-v2/aws"
+
type GlobalCfg struct {
Key string `mapstructure:"key"`
Secret string `mapstructure:"secret"`
@@ -36,7 +38,7 @@ type Config struct {
// Queue URLs and names are case-sensitive.
//
// This member is required.
- Queue string `mapstructure:"queue"`
+ Queue *string `mapstructure:"queue"`
// A map of attributes with their corresponding values. The following lists the
// names, descriptions, and values of the special request parameters that the
@@ -81,8 +83,8 @@ func (c *GlobalCfg) InitDefault() {
}
func (c *Config) InitDefault() {
- if c.Queue == "" {
- c.Queue = "default"
+ if c.Queue == nil {
+ c.Queue = aws.String("default")
}
if c.PrefetchCount == 0 || c.PrefetchCount > 10 {
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 7e1f6d56..18546715 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -34,7 +34,7 @@ type JobConsumer struct {
sessionToken string
region string
endpoint string
- queue string
+ queue *string
messageGroupID string
waitTime int32
prefetch int32
@@ -47,8 +47,8 @@ type JobConsumer struct {
attributes map[string]string
tags map[string]string
- client *sqs.Client
- outputQ *sqs.CreateQueueOutput
+ client *sqs.Client
+ queueURL *string
pauseCh chan struct{}
}
@@ -120,11 +120,22 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -189,7 +200,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
messageGroupID: uuid.NewString(),
attributes: attr,
tags: tg,
- queue: pipe.String(queue, "default"),
+ queue: aws.String(pipe.String(queue, "default")),
prefetch: int32(pipe.Int(pref, 10)),
visibilityTimeout: int32(pipe.Int(visibility, 0)),
waitTime: int32(pipe.Int(waitTime, 0)),
@@ -217,11 +228,22 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
})
})
- jb.outputQ, err = jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: aws.String(jb.queue), Attributes: jb.attributes, Tags: jb.tags})
+ out, err := jb.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: jb.queue, Attributes: jb.attributes, Tags: jb.tags})
if err != nil {
return nil, errors.E(op, err)
}
+ // assign a queue URL
+ jb.queueURL = out.QueueUrl
+
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+
return jb, nil
}
@@ -245,7 +267,7 @@ func (j *JobConsumer) Push(jb *job.Job) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
- _, err := j.client.SendMessage(context.Background(), msg.pack(j.outputQ.QueueUrl))
+ _, err := j.client.SendMessage(context.Background(), msg.pack(j.queueURL))
if err != nil {
return errors.E(op, err)
}
@@ -274,6 +296,13 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
// start listener
go j.listen()
+ j.eh.Push(events.JobEvent{
+ Event: events.EventPipeRun,
+ Driver: pipe.Driver(),
+ Pipeline: pipe.Name(),
+ Start: time.Now(),
+ })
+
return nil
}
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index 0f03cd20..9dd0aa5f 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -8,6 +8,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ json "github.com/json-iterator/go"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/plugins/jobs/job"
"github.com/spiral/roadrunner/v2/utils"
@@ -108,7 +109,21 @@ func (i *Item) Body() []byte {
// Context packs job context (job, id) into binary payload.
// Not used in the sqs, MessageAttributes used instead
func (i *Item) Context() ([]byte, error) {
- return nil, nil
+ ctx, err := json.Marshal(
+ struct {
+ ID string `json:"id"`
+ Job string `json:"job"`
+ Headers map[string][]string `json:"headers"`
+ Timeout int64 `json:"timeout"`
+ Pipeline string `json:"pipeline"`
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
+ )
+
+ if err != nil {
+ return nil, err
+ }
+
+ return ctx, nil
}
func (i *Item) Ack() error {
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index ded79ae7..5722c19a 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -2,40 +2,71 @@ package sqs
import (
"context"
+ "time"
+ "github.com/aws/aws-sdk-go-v2/aws/transport/http"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
+ "github.com/aws/smithy-go"
)
const (
+ // All - get all message attribute names
All string = "All"
+
+ // NonExistentQueue AWS error code
+ NonExistentQueue string = "AWS.SimpleQueueService.NonExistentQueue"
)
-func (j *JobConsumer) listen() {
+func (j *JobConsumer) listen() { //nolint:gocognit
for {
select {
case <-j.pauseCh:
return
default:
message, err := j.client.ReceiveMessage(context.Background(), &sqs.ReceiveMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
MaxNumberOfMessages: j.prefetch,
AttributeNames: []types.QueueAttributeName{types.QueueAttributeName(ApproximateReceiveCount)},
MessageAttributeNames: []string{All},
VisibilityTimeout: j.visibilityTimeout,
WaitTimeSeconds: j.waitTime,
})
+
if err != nil {
+ if oErr, ok := (err).(*smithy.OperationError); ok {
+ if rErr, ok := oErr.Err.(*http.ResponseError); ok {
+ if apiErr, ok := rErr.Err.(*smithy.GenericAPIError); ok {
+ // in case of NonExistentQueue - recreate the queue
+ if apiErr.Code == NonExistentQueue {
+ j.log.Error("receive message", "error code", apiErr.ErrorCode(), "message", apiErr.ErrorMessage(), "error fault", apiErr.ErrorFault())
+ _, err = j.client.CreateQueue(context.Background(), &sqs.CreateQueueInput{QueueName: j.queue, Attributes: j.attributes, Tags: j.tags})
+ if err != nil {
+ j.log.Error("create queue", "error", err)
+ }
+ // To successfully create a new queue, you must provide a
+ // queue name that adheres to the limits related to queues
+ // (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-queues.html)
+ // and is unique within the scope of your queues. After you create a queue, you
+ // must wait at least one second after the queue is created to be able to use the <------------
+ // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
+ time.Sleep(time.Second * 2)
+ continue
+ }
+ }
+ }
+ }
+
j.log.Error("receive message", "error", err)
continue
}
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := unpack(&m, j.outputQ.QueueUrl, j.client)
+ item, err := unpack(&m, j.queueURL, j.client)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {
@@ -52,7 +83,7 @@ func (j *JobConsumer) listen() {
}
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.outputQ.QueueUrl,
+ QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,
})
if errD != nil {