diff options
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 13 | ||||
-rw-r--r-- | plugins/jobs/response_protocol.md | 13 |
2 files changed, 17 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 52c2cf65..a778f59b 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -34,8 +34,7 @@ type JobConsumer struct { // time.sleep goroutines max number goroutinesMaxNum uint64 - requeueCh chan *Item - stopCh chan struct{} + stopCh chan struct{} } func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) { @@ -47,7 +46,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } err := cfg.UnmarshalKey(configKey, &jb.cfg) @@ -76,7 +74,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand eh: eh, goroutinesMaxNum: 1000, stopCh: make(chan struct{}, 1), - requeueCh: make(chan *Item, 1000), } // initialize a local queue @@ -147,10 +144,14 @@ func (j *JobConsumer) consume() { // redirect for { select { - case item := <-j.localPrefetch: + case item, ok := <-j.localPrefetch: + if !ok { + j.log.Warn("ephemeral local prefetch queue was closed") + return + } // set requeue channel - item.Options.requeueCh = j.requeueCh + item.Options.requeueCh = j.localPrefetch j.pq.Insert(item) case <-j.stopCh: return diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md index a9230ffc..577317d4 100644 --- a/plugins/jobs/response_protocol.md +++ b/plugins/jobs/response_protocol.md @@ -12,9 +12,10 @@ Types are: 2 - ... ``` -- `NO_ERROR`: contains only `type` and empty `data`. -- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job, -`dalay_seconds`: to delay a queue for a provided amount of seconds. +- `NO_ERROR`: contains only `type` and empty `data`. +- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the + job, + `dalay_seconds`: to delay a queue for a provided amount of seconds. For example: @@ -37,6 +38,12 @@ For example: "data": { "message": "internal worker error", "requeue": true, + "headers": [ + { + "test": "1", + "test2": "2" + } + ], "delay_seconds": 10 } } |