diff options
author | Valery Piashchynski <[email protected]> | 2021-07-22 19:41:11 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-22 19:41:11 +0300 |
commit | 584e9ad1f50223f873661babae3b365a2b0662ec (patch) | |
tree | 16c0b10b57f6703e56f6c84d7d81ae43f554bbf4 /plugins/jobs | |
parent | 2ceebd687fd17b6029ef3df0e979c39bb39abc7f (diff) |
Initial tests for all drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 4 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/listener.go | 8 |
4 files changed, 11 insertions, 5 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 1490e587..ab5aad14 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -258,7 +258,7 @@ func (j *JobConsumer) Pause(p string) { j.stopCh <- struct{}{} j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index c49a23c1..2b0ff40b 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -68,10 +68,8 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand stopCh: make(chan struct{}, 1), } - jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000)) - // initialize a local queue - jb.localQueue = make(chan *Item, jb.cfg.PipelineSize) + jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000)) // consume from the queue go jb.consume() diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index 43617716..246ff3ba 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -340,7 +340,7 @@ func (j *JobConsumer) Pause(p string) { j.pauseCh <- struct{}{} j.eh.Push(events.JobEvent{ - Event: events.EventPipeStopped, + Event: events.EventPipePaused, Driver: pipe.Driver(), Pipeline: pipe.Name(), Start: time.Now(), diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go index 8c5d887e..887f8358 100644 --- a/plugins/jobs/drivers/sqs/listener.go +++ b/plugins/jobs/drivers/sqs/listener.go @@ -78,11 +78,19 @@ func (j *JobConsumer) listen() { //nolint:gocognit continue } + // No retry + if item.Options.MaxAttempts == 0 { + j.pq.Insert(item) + continue + } + + // MaxAttempts option specified if item.Options.CanRetry() { j.pq.Insert(item) continue } + // If MaxAttempts is more than 0, and can't retry -> delete the message _, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{ QueueUrl: j.queueURL, ReceiptHandle: m.ReceiptHandle, |