summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r--plugins/jobs/drivers/amqp/item.go13
1 files changed, 10 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b912dde..bc679037 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -50,6 +50,10 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ multipleAsk bool
+ requeue bool
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) {
}
func (j *Item) Ack() error {
- return j.AckFunc(false)
+ return j.AckFunc(j.Options.multipleAsk)
}
func (j *Item) Nack() error {
- return j.NackFunc(false, false)
+ return j.NackFunc(false, j.Options.requeue)
}
func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
@@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) {
// unpack restores jobs.Options
func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
- item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: j.multipleAck,
+ requeue: j.requeueOnFail,
+ }}
if _, ok := d.Headers[job.RRID].(string); !ok {
return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))