diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b912dde..bc679037 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -50,6 +50,10 @@ type Options struct { // Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min. Timeout int64 `json:"timeout,omitempty"` + + // private + multipleAsk bool + requeue bool } // DelayDuration returns delay duration in a form of time.Duration. @@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) { } func (j *Item) Ack() error { - return j.AckFunc(false) + return j.AckFunc(j.Options.multipleAsk) } func (j *Item) Nack() error { - return j.NackFunc(false, false) + return j.NackFunc(false, j.Options.requeue) } func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { @@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) { // unpack restores jobs.Options func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) { - item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} + item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ + multipleAsk: j.multipleAck, + requeue: j.requeueOnFail, + }} if _, ok := d.Headers[job.RRID].(string); !ok { return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID)) |