diff options
Diffstat (limited to 'plugins/jobs/drivers/sqs/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 21 |
1 files changed, 17 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index b81d08e5..8d93b12c 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -50,7 +50,8 @@ type JobConsumer struct { client *sqs.Client queueURL *string - pauseCh chan struct{} + requeueCh chan *Item + pauseCh chan struct{} } func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configurer, e events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -102,6 +103,7 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -136,6 +138,8 @@ func NewSQSConsumer(configKey string, log logger.Logger, cfg cfgPlugin.Configure // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -201,6 +205,7 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf secret: globalCfg.Secret, endpoint: globalCfg.Endpoint, pauseCh: make(chan struct{}, 1), + requeueCh: make(chan *Item, 1000), } // PARSE CONFIGURATION ------- @@ -235,6 +240,8 @@ func FromPipeline(pipe *pipeline.Pipeline, log logger.Logger, cfg cfgPlugin.Conf // queue. To get the queue URL, use the GetQueueUrl action. GetQueueUrl require time.Sleep(time.Second * 2) + jb.requeueListener() + return jb, nil } @@ -254,13 +261,19 @@ func (j *JobConsumer) Push(ctx context.Context, jb *job.Job) error { return errors.E(op, errors.Errorf("unable to push, maximum possible delay is 900 seconds (15 minutes), provided: %d", jb.Options.Delay)) } - msg := fromJob(jb) + err := j.handleItem(ctx, fromJob(jb)) + if err != nil { + return errors.E(op, err) + } + return nil +} +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { // The new value for the message's visibility timeout (in seconds). Values range: 0 // to 43200. Maximum: 12 hours. _, err := j.client.SendMessage(ctx, msg.pack(j.queueURL)) if err != nil { - return errors.E(op, err) + return err } return nil @@ -310,7 +323,7 @@ func (j *JobConsumer) Stop(context.Context) error { return nil } -func (j *JobConsumer) Pause(ctx context.Context, p string) { +func (j *JobConsumer) Pause(_ context.Context, p string) { // load atomic value pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { |