diff options
Diffstat (limited to 'plugins/jobs/drivers/ephemeral')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 52c2cf65..a778f59b 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -34,8 +34,7 @@ type JobConsumer struct { // time.sleep goroutines max number goroutinesMaxNum uint64 - requeueCh chan *Item - stopCh chan struct{} + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,7 +46,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -76,7 +74,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -147,10 +144,14 @@ func (j *JobConsumer) consume() { // redirect for { select { - case item := <-j.localPrefetch: + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } // set requeue channel - item.Options.requeueCh = j.requeueCh + item.Options.requeueCh = j.localPrefetch j.pq.Insert(item) case <-j.stopCh: return |