From de37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 13:42:33 +0300 Subject: Add headers support to the jobs protocol Signed-off-by: Valery Piashchynski --- plugins/jobs/drivers/amqp/item.go | 3 ++- plugins/jobs/drivers/beanstalk/item.go | 4 +++- plugins/jobs/drivers/ephemeral/item.go | 3 ++- plugins/jobs/drivers/sqs/item.go | 3 ++- 4 files changed, 9 insertions(+), 4 deletions(-) (limited to 'plugins/jobs/drivers') diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 1a7ce00e..908dbd15 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -114,9 +114,10 @@ func (i *Item) Nack() error { } // Requeue with the provided delay, handled by the Nack -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go index 91dbf41c..50c54b12 100644 --- a/plugins/jobs/drivers/beanstalk/item.go +++ b/plugins/jobs/drivers/beanstalk/item.go @@ -111,9 +111,10 @@ func (i *Item) Nack() error { return i.Options.conn.Delete(i.Options.id) } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil @@ -122,6 +123,7 @@ func (i *Item) Requeue(delay int64) error { } } + func fromJob(job *job.Job) *Item { return &Item{ Job: job.Job, diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go index d140c9ed..c1171ae2 100644 --- a/plugins/jobs/drivers/ephemeral/item.go +++ b/plugins/jobs/drivers/ephemeral/item.go @@ -100,11 +100,12 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { go func() { time.Sleep(time.Second * time.Duration(delay)) // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go index cd2f6104..a761d6bd 100644 --- a/plugins/jobs/drivers/sqs/item.go +++ b/plugins/jobs/drivers/sqs/item.go @@ -140,9 +140,10 @@ func (i *Item) Nack() error { return nil } -func (i *Item) Requeue(delay int64) error { +func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay + i.Headers = headers select { case i.Options.requeueCh <- i: return nil -- cgit v1.2.3