diff options
author | Valery Piashchynski <[email protected]> | 2021-08-10 19:54:03 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-10 19:54:03 +0300 |
commit | a8a7f4194156440ef3157d8e5d75c43ed0327bcf (patch) | |
tree | 9bc4240fb3c6f02682420689490f56d681d4b545 /plugins/jobs/drivers/amqp/requeue.go | |
parent | d379c28a1e9babead0266bc4fa10d6c5e7aa14cb (diff) |
Add jobs protocol support for the AMQP driver
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/requeue.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/requeue.go | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go new file mode 100644 index 00000000..a2b3b26c --- /dev/null +++ b/plugins/jobs/drivers/amqp/requeue.go @@ -0,0 +1,34 @@ +package amqp + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + pch := <-j.publishChan + + headers, err := pack(item.ID(), item) + if err != nil { + j.publishChan <- pch + j.log.Error("requeue pack", "error", err) + continue + } + + err = j.handleItem(item, headers, pch) + if err != nil { + j.publishChan <- pch + j.log.Error("requeue handle item", "error", err) + continue + } + + j.publishChan <- pch + } + } + }() +} |