diff options
-rw-r--r-- | plugins/jobs/drivers/beanstalk/item.go | 1 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/requeue.go | 25 | ||||
-rw-r--r-- | plugins/jobs/job/job_options_test.go | 35 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 1 | ||||
-rw-r--r-- | proto/jobs/v1beta/jobs.proto | 1 |
6 files changed, 0 insertions, 65 deletions
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 50c54b12..a5aa1791 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -123,7 +123,6 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } } - func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index a778f59b..03959b49 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -62,7 +62,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // consume from the queue go jb.consume() - jb.requeueListener() return jb, nil } @@ -81,7 +80,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand // consume from the queue go jb.consume() - jb.requeueListener() return jb, nil } diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go deleted file mode 100644 index afb97d54..00000000 --- a/plugins/jobs/drivers/ephemeral/requeue.go +++ /dev/null @@ -1,25 +0,0 @@ -package ephemeral - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - // TODO(rustatian): what timeout to use? - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} diff --git a/plugins/jobs/job/job_options_test.go b/plugins/jobs/job/job_options_test.go index f4b1dc0c..061e83cc 100644 --- a/plugins/jobs/job/job_options_test.go +++ b/plugins/jobs/job/job_options_test.go @@ -7,36 +7,6 @@ import ( "github.com/stretchr/testify/assert" ) -func TestOptions_CanRetry(t *testing.T) { - opts := &Options{Attempts: 0} - - assert.False(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) -} - -func TestOptions_CanRetry_SameValue(t *testing.T) { - opts := &Options{Attempts: 1} - - assert.False(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) -} - -func TestOptions_CanRetry_Value(t *testing.T) { - opts := &Options{Attempts: 2} - - assert.True(t, opts.CanRetry(0)) - assert.False(t, opts.CanRetry(1)) - assert.False(t, opts.CanRetry(2)) -} - -func TestOptions_CanRetry_Value3(t *testing.T) { - opts := &Options{Attempts: 3} - - assert.True(t, opts.CanRetry(0)) - assert.True(t, opts.CanRetry(1)) - assert.False(t, opts.CanRetry(2)) -} - func TestOptions_RetryDuration(t *testing.T) { opts := &Options{RetryDelay: 0} assert.Equal(t, time.Duration(0), opts.RetryDuration()) @@ -74,12 +44,10 @@ func TestOptions_Merge(t *testing.T) { Pipeline: "pipeline", Delay: 2, Timeout: 1, - Attempts: 1, RetryDelay: 1, }) assert.Equal(t, "pipeline", opts.Pipeline) - assert.Equal(t, int64(1), opts.Attempts) assert.Equal(t, int64(2), opts.Delay) assert.Equal(t, int64(1), opts.Timeout) assert.Equal(t, int64(1), opts.RetryDelay) @@ -90,7 +58,6 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { Pipeline: "default", Delay: 10, Timeout: 10, - Attempts: 10, RetryDelay: 10, } @@ -98,12 +65,10 @@ func TestOptions_MergeKeepOriginal(t *testing.T) { Pipeline: "pipeline", Delay: 2, Timeout: 1, - Attempts: 1, RetryDelay: 1, }) assert.Equal(t, "default", opts.Pipeline) - assert.Equal(t, int64(10), opts.Attempts) assert.Equal(t, int64(10), opts.Delay) assert.Equal(t, int64(10), opts.Timeout) assert.Equal(t, int64(10), opts.RetryDelay) diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index aeba499b..717ce33b 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -143,7 +143,6 @@ func (r *rpc) from(j *jobsv1beta.Job) *job.Job { Priority: j.GetOptions().GetPriority(), Pipeline: j.GetOptions().GetPipeline(), Delay: j.GetOptions().GetDelay(), - Attempts: j.GetOptions().GetAttempts(), RetryDelay: j.GetOptions().GetRetryDelay(), Timeout: j.GetOptions().GetTimeout(), }, diff --git a/proto/jobs/v1beta/jobs.proto b/proto/jobs/v1beta/jobs.proto index dc86e1c2..77d1fb51 100644 --- a/proto/jobs/v1beta/jobs.proto +++ b/proto/jobs/v1beta/jobs.proto @@ -38,7 +38,6 @@ message Options { int64 priority = 1; string pipeline = 2; int64 delay = 3; - int64 attempts = 4; int64 retry_delay = 5; int64 timeout = 6; } |