diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/requeue.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/requeue.go | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/requeue.go b/plugins/jobs/drivers/amqp/requeue.go new file mode 100644 index 00000000..a2b3b26c --- /dev/null +++ b/plugins/jobs/drivers/amqp/requeue.go @@ -0,0 +1,34 @@ +package amqp + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + pch := <-j.publishChan + + headers, err := pack(item.ID(), item) + if err != nil { + j.publishChan <- pch + j.log.Error("requeue pack", "error", err) + continue + } + + err = j.handleItem(item, headers, pch) + if err != nil { + j.publishChan <- pch + j.log.Error("requeue handle item", "error", err) + continue + } + + j.publishChan <- pch + } + } + }() +} |