summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/ephemeral
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go2
-rw-r--r--plugins/jobs/drivers/ephemeral/requeue.go25
2 files changed, 0 insertions, 27 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index a778f59b..03959b49 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -62,7 +62,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
// consume from the queue
go jb.consume()
- jb.requeueListener()
return jb, nil
}
@@ -81,7 +80,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
// consume from the queue
go jb.consume()
- jb.requeueListener()
return jb, nil
}
diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go
deleted file mode 100644
index afb97d54..00000000
--- a/plugins/jobs/drivers/ephemeral/requeue.go
+++ /dev/null
@@ -1,25 +0,0 @@
-package ephemeral
-
-import "context"
-
-// requeueListener should handle items passed to requeue
-func (j *JobConsumer) requeueListener() {
- go func() {
- for { //nolint:gosimple
- select {
- case item, ok := <-j.requeueCh:
- if !ok {
- j.log.Info("requeue channel closed")
- return
- }
-
- // TODO(rustatian): what timeout to use?
- err := j.handleItem(context.TODO(), item)
- if err != nil {
- j.log.Error("requeue handle item", "error", err)
- continue
- }
- }
- }
- }()
-}