diff options
author | Valery Piashchynski <[email protected]> | 2021-08-10 22:48:31 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-10 22:48:31 +0300 |
commit | d449d9d5aec1eec6d494064299feb1551f88ffe2 (patch) | |
tree | a905126b44bcfab29af9b5bc3eddaf5398375975 /plugins/jobs/drivers/ephemeral/requeue.go | |
parent | a8a7f4194156440ef3157d8e5d75c43ed0327bcf (diff) |
Add support for the jobs-worker protocol for the beanstalk,ephemeral and
sqs drivers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/ephemeral/requeue.go')
-rw-r--r-- | plugins/jobs/drivers/ephemeral/requeue.go | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go new file mode 100644 index 00000000..afb97d54 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/requeue.go @@ -0,0 +1,25 @@ +package ephemeral + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what timeout to use? + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} |