From d449d9d5aec1eec6d494064299feb1551f88ffe2 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Tue, 10 Aug 2021 22:48:31 +0300 Subject: Add support for the jobs-worker protocol for the beanstalk,ephemeral and sqs drivers Signed-off-by: Valery Piashchynski --- plugins/jobs/drivers/ephemeral/requeue.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 plugins/jobs/drivers/ephemeral/requeue.go (limited to 'plugins/jobs/drivers/ephemeral/requeue.go') diff --git a/plugins/jobs/drivers/ephemeral/requeue.go b/plugins/jobs/drivers/ephemeral/requeue.go new file mode 100644 index 00000000..afb97d54 --- /dev/null +++ b/plugins/jobs/drivers/ephemeral/requeue.go @@ -0,0 +1,25 @@ +package ephemeral + +import "context" + +// requeueListener should handle items passed to requeue +func (j *JobConsumer) requeueListener() { + go func() { + for { //nolint:gosimple + select { + case item, ok := <-j.requeueCh: + if !ok { + j.log.Info("requeue channel closed") + return + } + + // TODO(rustatian): what timeout to use? + err := j.handleItem(context.TODO(), item) + if err != nil { + j.log.Error("requeue handle item", "error", err) + continue + } + } + } + }() +} -- cgit v1.2.3