diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 9 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/item.go | 30 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/requeue.go | 25 |
3 files changed, 24 insertions, 40 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 8d93b12c..5d741358 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,8 +50,7 @@ type JobConsumer struct { client *sqs.Client queueURL *string - requeueCh chan *Item - pauseCh chan struct{} + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -103,7 +102,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -138,8 +136,6 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } @@ -205,7 +201,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -240,8 +235,6 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) - jb.requeueListener() - return jb, nil } diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index a761d6bd..f5fac0b3 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -64,7 +64,7 @@ type Options struct { queue *string receiptHandler *string client *sqs.Client - requeueCh chan *Item + requeueFn func(context.Context, *Item) error } // DelayDuration returns delay duration in a form of time.Duration. @@ -144,12 +144,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + // requeue message + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + return err } + + // Delete job from the queue only after successful requeue + _, err = i.Options.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ + QueueUrl: i.Options.queue, + ReceiptHandle: i.Options.receiptHandler, + }) + + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } func fromJob(job *job.Job) *Item { @@ -227,7 +243,7 @@ func (j *JobConsumer) unpack(msg *types.Message) (*Item, error) { client: j.client, queue: j.queue, receiptHandler: msg.ReceiptHandle, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }, } diff --git a/plugins/jobs/drivers/sqs/requeue.go b/plugins/jobs/drivers/sqs/requeue.go deleted file mode 100644 index 87e885e0..00000000 --- a/plugins/jobs/drivers/sqs/requeue.go +++ /dev/null @@ -1,25 +0,0 @@ -package sqs - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - // TODO(rustatian): what context to use - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} |