diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 2 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/requeue.go | 25 |
2 files changed, 0 insertions, 27 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index a778f59b..03959b49 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -62,7 +62,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh // consume from the queue go jb.consume() - jb.requeueListener() return jb, nil } @@ -81,7 +80,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand // consume from the queue go jb.consume() - jb.requeueListener() return jb, nil } diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go deleted file mode 100644 index afb97d54..00000000 --- a/plugins/jobs/drivers/ephemeral/requeue.go +++ /dev/null @@ -1,25 +0,0 @@ -package ephemeral - -import "context" - -// requeueListener should handle items passed to requeue -func (j *JobConsumer) requeueListener() { - go func() { - for { //nolint:gosimple - select { - case item, ok := <-j.requeueCh: - if !ok { - j.log.Info("requeue channel closed") - return - } - - // TODO(rustatian): what timeout to use? - err := j.handleItem(context.TODO(), item) - if err != nil { - j.log.Error("requeue handle item", "error", err) - continue - } - } - } - }() -} |