summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 13:25:53 +0300
committerValery Piashchynski <[email protected]>2021-08-11 13:25:53 +0300
commit2924f4b4daa3c53408a036583dcc39f6de805e2b (patch)
tree44262a307c0f00f04ccd0f59429e7e63717e4d03 /plugins/jobs/drivers
parentaeb7d301eefc1c47374ab9b758ea137151e29219 (diff)
Add headers to the protocol
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go13
1 files changed, 7 insertions, 6 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 52c2cf65..a778f59b 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -34,8 +34,7 @@ type JobConsumer struct {
// time.sleep goroutines max number
goroutinesMaxNum uint64
- requeueCh chan *Item
- stopCh chan struct{}
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -47,7 +46,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -76,7 +74,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// initialize a local queue
@@ -147,10 +144,14 @@ func (j *JobConsumer) consume() {
// redirect
for {
select {
- case item := <-j.localPrefetch:
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
// set requeue channel
- item.Options.requeueCh = j.requeueCh
+ item.Options.requeueCh = j.localPrefetch
j.pq.Insert(item)
case <-j.stopCh:
return