diff options
author | Valery Piashchynski <[email protected]> | 2021-08-10 22:48:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-10 22:48:31 +0300 |
commit | d449d9d5aec1eec6d494064299feb1551f88ffe2 (patch) | |
tree | a905126b44bcfab29af9b5bc3eddaf5398375975 /plugins/jobs/drivers/sqs | |
parent | a8a7f4194156440ef3157d8e5d75c43ed0327bcf (diff) |
Add support for the jobs-worker protocol for the beanstalk,ephemeral and
sqs drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 21 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 65 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 24 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/requeue.go | 25 |
4 files changed, 69 insertions, 66 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index b81d08e5..8d93b12c 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,7 +50,8 @@ type JobConsumer struct { client *sqs.Client queueURL *string - pauseCh chan struct{} + requeueCh chan *Item + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -102,6 +103,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -136,6 +138,8 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -201,6 +205,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -235,6 +240,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -254,13 +261,19 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - msg := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + return nil +} +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. _, err := j.client.SendMessage(ctx, msg.pack(j.queueURL)) if err != nil { - return errors.E(op, err) + return err } return nil @@ -310,7 +323,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a3039d1b..ea4ac8b7 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -60,23 +60,12 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` - // Maximum number of attempts to receive and process the message - MaxAttempts int64 `json:"max_attempts,omitempty"` - // Private ================ approxReceiveCount int64 queue *string receiptHandler *string client *sqs.Client -} - -// CanRetry must return true if broker is allowed to re-run the job. -func (o *Options) CanRetry() bool { - // Attempts 1 and 0 has identical effect - if o.MaxAttempts == 0 || o.MaxAttempts == 1 { - return false - } - return o.MaxAttempts > (o.approxReceiveCount + 1) + requeueCh chan *Item } // DelayDuration returns delay duration in a form of time.Duration. @@ -140,10 +129,6 @@ func (i *Item) Ack() error { } func (i *Item) Nack() error { - if i.Options.CanRetry() { - return nil - } - _, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: i.Options.queue, ReceiptHandle: i.Options.receiptHandler, @@ -156,8 +141,15 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(_ int64) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } func fromJob(job *job.Job) *Item { @@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item { Payload: job.Payload, Headers: job.Headers, Options: &Options{ - Priority: job.Options.Priority, - Pipeline: job.Options.Pipeline, - Delay: job.Options.Delay, - Timeout: job.Options.Timeout, - MaxAttempts: job.Options.Attempts, + Priority: job.Options.Priority, + Pipeline: job.Options.Pipeline, + Delay: job.Options.Delay, + Timeout: job.Options.Timeout, }, } } @@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput { QueueUrl: queue, DelaySeconds: int32(i.Options.Delay), MessageAttributes: map[string]types.MessageAttributeValue{ - job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, - job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, - job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, - job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, - job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))}, + job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)}, + job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))}, + job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))}, + job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))}, }, } } -func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) { +func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { const op = errors.Op("sqs_unpack") // reserved if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok { @@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error } } - attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue) - if err != nil { - return nil, errors.E(op, err) - } - delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue) if err != nil { return nil, errors.E(op, err) @@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error Job: *msg.MessageAttributes[job.RRJob].StringValue, Payload: *msg.Body, Options: &Options{ - Delay: int64(delay), - Timeout: int64(to), - Priority: int64(priority), - MaxAttempts: int64(attempt), + Delay: int64(delay), + Timeout: int64(to), + Priority: int64(priority), // private approxReceiveCount: int64(recCount), - client: client, - queue: queue, + client: j.client, + queue: j.queue, receiptHandler: msg.ReceiptHandle, + requeueCh: j.requeueCh, }, } diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index e2323fa3..b72ac065 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -64,7 +64,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit for i := 0; i < len(message.Messages); i++ { m := message.Messages[i] - item, err := unpack(&m, j.queueURL, j.client) + item, err := j.unpack(&m) if err != nil { _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.queueURL, @@ -78,27 +78,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit continue } - // No retry - if item.Options.MaxAttempts == 0 { - j.pq.Insert(item) - continue - } - - // MaxAttempts option specified - if item.Options.CanRetry() { - j.pq.Insert(item) - continue - } - - // If MaxAttempts is more than 0, and can't retry -> delete the message - _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ - QueueUrl: j.queueURL, - ReceiptHandle: m.ReceiptHandle, - }) - if errD != nil { - j.log.Error("message unpack, failed to delete the message from the queue", "error", err) - continue - } + j.pq.Insert(item) } } } diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go new file mode 100644 index 00000000..87e885e0 --- /dev/null +++ b/plugins/jobs/drivers/sqs/requeue.go @@ -0,0 +1,25 @@ +package sqs + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what context to use + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} |