summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 13:44:41 +0300
committerValery Piashchynski <[email protected]>2021-08-11 13:44:41 +0300
commite855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (patch)
tree6653231b4ad11ba93d3b1562c38a2798abb467ff
parentde37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b (diff)
Remove attempts from the proto, and general jobs options
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go1
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/requeue.go25
-rw-r--r--plugins/jobs/job/job_options_test.go35
-rw-r--r--plugins/jobs/rpc.go1
-rw-r--r--proto/jobs/v1beta/jobs.proto1
6 files changed, 0 insertions, 65 deletions
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 50c54b12..a5aa1791 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -123,7 +123,6 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
}
-
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index a778f59b..03959b49 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -62,7 +62,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// consume from the queue
go jb.consume()
- jb.requeueListener()
return jb, nil
}
@@ -81,7 +80,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
// consume from the queue
go jb.consume()
- jb.requeueListener()
return jb, nil
}
diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go
deleted file mode 100644
index afb97d54..00000000
--- a/plugins/jobs/drivers/ephemeral/requeue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package ephemeral
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- // TODO(rustatian): what timeout to use?
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}
diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go
index f4b1dc0c..061e83cc 100644
--- a/plugins/jobs/job/job_options_test.go
+++ b/plugins/jobs/job/job_options_test.go
@@ -7,36 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)
-func TestOptions_CanRetry(t *testing.T) {
- opts := &Options{Attempts: 0}
-
- assert.False(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
-}
-
-func TestOptions_CanRetry_SameValue(t *testing.T) {
- opts := &Options{Attempts: 1}
-
- assert.False(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
-}
-
-func TestOptions_CanRetry_Value(t *testing.T) {
- opts := &Options{Attempts: 2}
-
- assert.True(t, opts.CanRetry(0))
- assert.False(t, opts.CanRetry(1))
- assert.False(t, opts.CanRetry(2))
-}
-
-func TestOptions_CanRetry_Value3(t *testing.T) {
- opts := &Options{Attempts: 3}
-
- assert.True(t, opts.CanRetry(0))
- assert.True(t, opts.CanRetry(1))
- assert.False(t, opts.CanRetry(2))
-}
-
func TestOptions_RetryDuration(t *testing.T) {
opts := &Options{RetryDelay: 0}
assert.Equal(t, time.Duration(0), opts.RetryDuration())
@@ -74,12 +44,10 @@ func TestOptions_Merge(t *testing.T) {
Pipeline: "pipeline",
Delay: 2,
Timeout: 1,
- Attempts: 1,
RetryDelay: 1,
})
assert.Equal(t, "pipeline", opts.Pipeline)
- assert.Equal(t, int64(1), opts.Attempts)
assert.Equal(t, int64(2), opts.Delay)
assert.Equal(t, int64(1), opts.Timeout)
assert.Equal(t, int64(1), opts.RetryDelay)
@@ -90,7 +58,6 @@ func TestOptions_MergeKeepOriginal(t *testing.T) {
Pipeline: "default",
Delay: 10,
Timeout: 10,
- Attempts: 10,
RetryDelay: 10,
}
@@ -98,12 +65,10 @@ func TestOptions_MergeKeepOriginal(t *testing.T) {
Pipeline: "pipeline",
Delay: 2,
Timeout: 1,
- Attempts: 1,
RetryDelay: 1,
})
assert.Equal(t, "default", opts.Pipeline)
- assert.Equal(t, int64(10), opts.Attempts)
assert.Equal(t, int64(10), opts.Delay)
assert.Equal(t, int64(10), opts.Timeout)
assert.Equal(t, int64(10), opts.RetryDelay)
diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go
index aeba499b..717ce33b 100644
--- a/plugins/jobs/rpc.go
+++ b/plugins/jobs/rpc.go
@@ -143,7 +143,6 @@ func (r *rpc) from(j *jobsv1beta.Job) *job.Job {
Priority: j.GetOptions().GetPriority(),
Pipeline: j.GetOptions().GetPipeline(),
Delay: j.GetOptions().GetDelay(),
- Attempts: j.GetOptions().GetAttempts(),
RetryDelay: j.GetOptions().GetRetryDelay(),
Timeout: j.GetOptions().GetTimeout(),
},
diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto
index dc86e1c2..77d1fb51 100644
--- a/proto/jobs/v1beta/jobs.proto
+++ b/proto/jobs/v1beta/jobs.proto
@@ -38,7 +38,6 @@ message Options {
int64 priority = 1;
string pipeline = 2;
int64 delay = 3;
- int64 attempts = 4;
int64 retry_delay = 5;
int64 timeout = 6;
}