summaryrefslogtreecommitdiff
path: root/plugins/jobs
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 19:41:11 +0300
committerValery Piashchynski <[email protected]>2021-07-22 19:41:11 +0300
commit584e9ad1f50223f873661babae3b365a2b0662ec (patch)
tree16c0b10b57f6703e56f6c84d7d81ae43f554bbf4 /plugins/jobs
parent2ceebd687fd17b6029ef3df0e979c39bb39abc7f (diff)
Initial tests for all drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs')
-rw-r--r--plugins/jobs/drivers/beanstalk/consumer.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go4
-rw-r--r--plugins/jobs/drivers/sqs/consumer.go2
-rw-r--r--plugins/jobs/drivers/sqs/listener.go8
4 files changed, 11 insertions, 5 deletions
diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go
index 1490e587..ab5aad14 100644
--- a/plugins/jobs/drivers/beanstalk/consumer.go
+++ b/plugins/jobs/drivers/beanstalk/consumer.go
@@ -258,7 +258,7 @@ func (j *JobConsumer) Pause(p string) {
j.stopCh <- struct{}{}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index c49a23c1..2b0ff40b 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -68,10 +68,8 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
stopCh: make(chan struct{}, 1),
}
- jb.cfg.PipelineSize = uint64(pipeline.Int(pipelineSize, 100_000))
-
// initialize a local queue
- jb.localQueue = make(chan *Item, jb.cfg.PipelineSize)
+ jb.localQueue = make(chan *Item, pipeline.Int(pipelineSize, 100_000))
// consume from the queue
go jb.consume()
diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go
index 43617716..246ff3ba 100644
--- a/plugins/jobs/drivers/sqs/consumer.go
+++ b/plugins/jobs/drivers/sqs/consumer.go
@@ -340,7 +340,7 @@ func (j *JobConsumer) Pause(p string) {
j.pauseCh <- struct{}{}
j.eh.Push(events.JobEvent{
- Event: events.EventPipeStopped,
+ Event: events.EventPipePaused,
Driver: pipe.Driver(),
Pipeline: pipe.Name(),
Start: time.Now(),
diff --git a/plugins/jobs/drivers/sqs/listener.go b/plugins/jobs/drivers/sqs/listener.go
index 8c5d887e..887f8358 100644
--- a/plugins/jobs/drivers/sqs/listener.go
+++ b/plugins/jobs/drivers/sqs/listener.go
@@ -78,11 +78,19 @@ func (j *JobConsumer) listen() { //nolint:gocognit
continue
}
+ // No retry
+ if item.Options.MaxAttempts == 0 {
+ j.pq.Insert(item)
+ continue
+ }
+
+ // MaxAttempts option specified
if item.Options.CanRetry() {
j.pq.Insert(item)
continue
}
+ // If MaxAttempts is more than 0, and can't retry -> delete the message
_, errD := j.client.DeleteMessage(context.Background(), &sqs.DeleteMessageInput{
QueueUrl: j.queueURL,
ReceiptHandle: m.ReceiptHandle,