summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 13:25:53 +0300
committerValery Piashchynski <[email protected]>2021-08-11 13:25:53 +0300
commit2924f4b4daa3c53408a036583dcc39f6de805e2b (patch)
tree44262a307c0f00f04ccd0f59429e7e63717e4d03
parentaeb7d301eefc1c47374ab9b758ea137151e29219 (diff)
Add headers to the protocol
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/drivers/ephemeral/consumer.go13
-rw-r--r--plugins/jobs/response_protocol.md13
2 files changed, 17 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go
index 52c2cf65..a778f59b 100644
--- a/plugins/jobs/drivers/ephemeral/consumer.go
+++ b/plugins/jobs/drivers/ephemeral/consumer.go
@@ -34,8 +34,7 @@ type JobConsumer struct {
// time.sleep goroutines max number
goroutinesMaxNum uint64
- requeueCh chan *Item
- stopCh chan struct{}
+ stopCh chan struct{}
}
func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh events.Handler, pq priorityqueue.Queue) (*JobConsumer, error) {
@@ -47,7 +46,6 @@ func NewJobBroker(configKey string, log logger.Logger, cfg config.Configurer, eh
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
err := cfg.UnmarshalKey(configKey, &jb.cfg)
@@ -76,7 +74,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, eh events.Hand
eh: eh,
goroutinesMaxNum: 1000,
stopCh: make(chan struct{}, 1),
- requeueCh: make(chan *Item, 1000),
}
// initialize a local queue
@@ -147,10 +144,14 @@ func (j *JobConsumer) consume() {
// redirect
for {
select {
- case item := <-j.localPrefetch:
+ case item, ok := <-j.localPrefetch:
+ if !ok {
+ j.log.Warn("ephemeral local prefetch queue was closed")
+ return
+ }
// set requeue channel
- item.Options.requeueCh = j.requeueCh
+ item.Options.requeueCh = j.localPrefetch
j.pq.Insert(item)
case <-j.stopCh:
return
diff --git a/plugins/jobs/response_protocol.md b/plugins/jobs/response_protocol.md
index a9230ffc..577317d4 100644
--- a/plugins/jobs/response_protocol.md
+++ b/plugins/jobs/response_protocol.md
@@ -12,9 +12,10 @@ Types are:
2 - ...
```
-- `NO_ERROR`: contains only `type` and empty `data`.
-- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the job,
-`dalay_seconds`: to delay a queue for a provided amount of seconds.
+- `NO_ERROR`: contains only `type` and empty `data`.
+- `ERROR` : contains `type`: 1, and `data` field with: `message` describing the error, `requeue` flag to requeue the
+ job,
+ `dalay_seconds`: to delay a queue for a provided amount of seconds.
For example:
@@ -37,6 +38,12 @@ For example:
"data": {
"message": "internal worker error",
"requeue": true,
+ "headers": [
+ {
+ "test": "1",
+ "test2": "2"
+ }
+ ],
"delay_seconds": 10
}
}