diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 13:25:53 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 13:25:53 +0300 |
commit | 2924f4b4daa3c53408a036583dcc39f6de805e2b (patch) | |
tree | 44262a307c0f00f04ccd0f59429e7e63717e4d03 /plugins/jobs/drivers | |
parent | aeb7d301eefc1c47374ab9b758ea137151e29219 (diff) |
Add headers to the protocol
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 52c2cf65..a778f59b 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -34,8 +34,7 @@ type JobConsumer struct { // time.sleep goroutines max number goroutinesMaxNum uint64 - requeueCh chan *Item - stopCh chan struct{} + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,7 +46,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -76,7 +74,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -147,10 +144,14 @@ func (j *JobConsumer) consume() { // redirect for { select { - case item := <-j.localPrefetch: + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } // set requeue channel - item.Options.requeueCh = j.requeueCh + item.Options.requeueCh = j.localPrefetch j.pq.Insert(item) case <-j.stopCh: return |