summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 13:42:33 +0300
committerValery Piashchynski <[email protected]>2021-08-11 13:42:33 +0300
commitde37ed3ae8d08a50d9ffe088c1d58d9dffdf7c9b (patch)
tree712a47dde5941dd53c9f12ae41e62384df57b4b4 /plugins/jobs/drivers
parent2924f4b4daa3c53408a036583dcc39f6de805e2b (diff)
Add headers support to the jobs protocol
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/item.go3
-rw-r--r--plugins/jobs/drivers/beanstalk/item.go4
-rw-r--r--plugins/jobs/drivers/ephemeral/item.go3
-rw-r--r--plugins/jobs/drivers/sqs/item.go3
4 files changed, 9 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 1a7ce00e..908dbd15 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -114,9 +114,10 @@ func (i *Item) Nack() error {
}
// Requeue with the provided delay, handled by the Nack
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil
diff --git a/plugins/jobs/drivers/beanstalk/item.go b/plugins/jobs/drivers/beanstalk/item.go
index 91dbf41c..50c54b12 100644
--- a/plugins/jobs/drivers/beanstalk/item.go
+++ b/plugins/jobs/drivers/beanstalk/item.go
@@ -111,9 +111,10 @@ func (i *Item) Nack() error {
return i.Options.conn.Delete(i.Options.id)
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil
@@ -122,6 +123,7 @@ func (i *Item) Requeue(delay int64) error {
}
}
+
func fromJob(job *job.Job) *Item {
return &Item{
Job: job.Job,
diff --git a/plugins/jobs/drivers/ephemeral/item.go b/plugins/jobs/drivers/ephemeral/item.go
index d140c9ed..c1171ae2 100644
--- a/plugins/jobs/drivers/ephemeral/item.go
+++ b/plugins/jobs/drivers/ephemeral/item.go
@@ -100,11 +100,12 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
go func() {
time.Sleep(time.Second * time.Duration(delay))
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return
diff --git a/plugins/jobs/drivers/sqs/item.go b/plugins/jobs/drivers/sqs/item.go
index cd2f6104..a761d6bd 100644
--- a/plugins/jobs/drivers/sqs/item.go
+++ b/plugins/jobs/drivers/sqs/item.go
@@ -140,9 +140,10 @@ func (i *Item) Nack() error {
return nil
}
-func (i *Item) Requeue(delay int64) error {
+func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
+ i.Headers = headers
select {
case i.Options.requeueCh <- i:
return nil