summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r--plugins/jobs/drivers/amqp/item.go15
1 files changed, 13 insertions, 2 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 5990d137..623dcca7 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -3,6 +3,7 @@ package amqp
import (
"context"
"fmt"
+ "sync/atomic"
"time"
json "github.com/json-iterator/go"
@@ -52,8 +53,8 @@ type Options struct {
nack func(multiply bool, requeue bool) error
// requeueFn used as a pointer to the push function
- requeueFn func(context.Context, *Item) error
-
+ requeueFn func(context.Context, *Item) error
+ delayed *int64
multipleAsk bool
requeue bool
}
@@ -96,15 +97,24 @@ func (i *Item) Context() ([]byte, error) {
}
func (i *Item) Ack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.ack(i.Options.multipleAsk)
}
func (i *Item) Nack() error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
return i.Options.nack(false, i.Options.requeue)
}
// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
+ if i.Options.Delay > 0 {
+ atomic.AddInt64(i.Options.delayed, ^int64(0))
+ }
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
@@ -146,6 +156,7 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
+ item.Options.delayed = j.delayed
// requeue func
item.Options.requeueFn = j.handleItem