summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/sqs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
committerValery Piashchynski <[email protected]>2021-08-10 22:48:31 +0300
commitd449d9d5aec1eec6d494064299feb1551f88ffe2 (patch)
treea905126b44bcfab29af9b5bc3eddaf5398375975 /plugins/jobs/drivers/sqs
parenta8a7f4194156440ef3157d8e5d75c43ed0327bcf (diff)
Add support for the jobs-worker protocol for the beanstalk,ephemeral and
sqs drivers Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go21
-rw-r--r--plugins/jobs/drivers/sqs/item.go65
-rw-r--r--plugins/jobs/drivers/sqs/listener.go24
-rw-r--r--plugins/jobs/drivers/sqs/requeue.go25
4 files changed, 69 insertions, 66 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index b81d08e5..8d93b12c 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -50,7 +50,8 @@ type JobConsumer struct {
client *sqs.Client
queueURL *string
- pauseCh chan struct{}
+ requeueCh chan *Item
+ pauseCh chan struct{}
}
func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -102,6 +103,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -136,6 +138,8 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
+ jb.requeueListener()
+
return jb, nil
}
@@ -201,6 +205,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
secret: globalCfg.Secret,
endpoint: globalCfg.Endpoint,
pauseCh: make(chan struct{}, 1),
+ requeueCh: make(chan *Item, 1000),
}
// PARSE CONFIGURATION -------
@@ -235,6 +240,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf
// queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require
time.Sleep(time.Second * 2)
+ jb.requeueListener()
+
return jb, nil
}
@@ -254,13 +261,19 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error {
return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay))
}
- msg := fromJob(jb)
+ err := j.handleItem(ctx, fromJob(jb))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+}
+func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error {
// The new value for the message's visibility timeout (in seconds). Values range: 0
// to 43200. Maximum: 12 hours.
_, err := j.client.SendMessage(ctx, msg.pack(j.queueURL))
if err != nil {
- return errors.E(op, err)
+ return err
}
return nil
@@ -310,7 +323,7 @@ func (j *JobConsumer) Stop(context.Context) error {
return nil
}
-func (j *JobConsumer) Pause(ctx context.Context, p string) {
+func (j *JobConsumer) Pause(_ context.Context, p string) {
// load atomic value
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index a3039d1b..ea4ac8b7 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -60,23 +60,12 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
- // Maximum number of attempts to receive and process the message
- MaxAttempts int64 `json:"max_attempts,omitempty"`
-
// Private ================
approxReceiveCount int64
queue *string
receiptHandler *string
client *sqs.Client
-}
-
-// CanRetry must return true if broker is allowed to re-run the job.
-func (o *Options) CanRetry() bool {
- // Attempts 1 and 0 has identical effect
- if o.MaxAttempts == 0 || o.MaxAttempts == 1 {
- return false
- }
- return o.MaxAttempts > (o.approxReceiveCount + 1)
+ requeueCh chan *Item
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -140,10 +129,6 @@ func (i *Item) Ack() error {
}
func (i *Item) Nack() error {
- if i.Options.CanRetry() {
- return nil
- }
-
_, err := i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: i.Options.queue,
ReceiptHandle: i.Options.receiptHandler,
@@ -156,8 +141,15 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(_ int64) error {
- return nil
+func (i *Item) Requeue(delay int64) error {
+ // overwrite the delay
+ i.Options.Delay = delay
+ select {
+ case i.Options.requeueCh <- i:
+ return nil
+ default:
+ return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+ }
}
func fromJob(job *job.Job) *Item {
@@ -167,11 +159,10 @@ func fromJob(job *job.Job) *Item {
Payload: job.Payload,
Headers: job.Headers,
Options: &Options{
- Priority: job.Options.Priority,
- Pipeline: job.Options.Pipeline,
- Delay: job.Options.Delay,
- Timeout: job.Options.Timeout,
- MaxAttempts: job.Options.Attempts,
+ Priority: job.Options.Priority,
+ Pipeline: job.Options.Pipeline,
+ Delay: job.Options.Delay,
+ Timeout: job.Options.Timeout,
},
}
}
@@ -182,16 +173,15 @@ func (i *Item) pack(queue *string) *sqs.SendMessageInput {
QueueUrl: queue,
DelaySeconds: int32(i.Options.Delay),
MessageAttributes: map[string]types.MessageAttributeValue{
- job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
- job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
- job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
- job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
- job.RRMaxAttempts: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.MaxAttempts)))},
+ job.RRJob: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(i.Job)},
+ job.RRDelay: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Delay)))},
+ job.RRTimeout: {DataType: aws.String(StringType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Timeout)))},
+ job.RRPriority: {DataType: aws.String(NumberType), BinaryValue: nil, BinaryListValues: nil, StringListValues: nil, StringValue: aws.String(strconv.Itoa(int(i.Options.Priority)))},
},
}
}
-func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error) {
+func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) {
const op = errors.Op("sqs_unpack")
// reserved
if _, ok := msg.Attributes[ApproximateReceiveCount]; !ok {
@@ -204,11 +194,6 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
}
}
- attempt, err := strconv.Atoi(*msg.MessageAttributes[job.RRMaxAttempts].StringValue)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
delay, err := strconv.Atoi(*msg.MessageAttributes[job.RRDelay].StringValue)
if err != nil {
return nil, errors.E(op, err)
@@ -233,16 +218,16 @@ func unpack(msg *types.Message, queue *string, client *sqs.Client) (*Item, error
Job: *msg.MessageAttributes[job.RRJob].StringValue,
Payload: *msg.Body,
Options: &Options{
- Delay: int64(delay),
- Timeout: int64(to),
- Priority: int64(priority),
- MaxAttempts: int64(attempt),
+ Delay: int64(delay),
+ Timeout: int64(to),
+ Priority: int64(priority),
// private
approxReceiveCount: int64(recCount),
- client: client,
- queue: queue,
+ client: j.client,
+ queue: j.queue,
receiptHandler: msg.ReceiptHandle,
+ requeueCh: j.requeueCh,
},
}
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index e2323fa3..b72ac065 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -64,7 +64,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
for i := 0; i < len(message.Messages); i++ {
m := message.Messages[i]
- item, err := unpack(&m, j.queueURL, j.client)
+ item, err := j.unpack(&m)
if err != nil {
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.queueURL,
@@ -78,27 +78,7 @@ func (j *JobConsumer) listen(ctx context.Context) { //nolint:gocognit
continue
}
- // No retry
- if item.Options.MaxAttempts == 0 {
- j.pq.Insert(item)
- continue
- }
-
- // MaxAttempts option specified
- if item.Options.CanRetry() {
- j.pq.Insert(item)
- continue
- }
-
- // If MaxAttempts is more than 0, and can't retry -> delete the message
- _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
- QueueUrl: j.queueURL,
- ReceiptHandle: m.ReceiptHandle,
- })
- if errD != nil {
- j.log.Error("message unpack, failed to delete the message from the queue", "error", err)
- continue
- }
+ j.pq.Insert(item)
}
}
}
diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go
new file mode 100644
index 00000000..87e885e0
--- /dev/null
+++ b/plugins/jobs/drivers/sqs/requeue.go
@@ -0,0 +1,25 @@
+package sqs
+
+import "context"
+
+// requeueListener should handle items passed to requeue
+func (j *JobConsumer) requeueListener() {
+ go func() {
+ for { //nolint:gosimple
+ select {
+ case item, ok := <-j.requeueCh:
+ if !ok {
+ j.log.Info("requeue channel closed")
+ return
+ }
+
+ // TODO(rustatian): what context to use
+ err := j.handleItem(context.TODO(), item)
+ if err != nil {
+ j.log.Error("requeue handle item", "error", err)
+ continue
+ }
+ }
+ }
+ }()
+}