diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 41 |
1 files changed, 24 insertions, 17 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 6b544620..a7e2c4e5 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -70,22 +70,22 @@ func (o *Options) TimeoutDuration() time.Duration { return time.Second * time.Duration(o.Timeout) } -func (j *Item) ID() string { - return j.Ident +func (i *Item) ID() string { + return i.Ident } -func (j *Item) Priority() int64 { - return j.Options.Priority +func (i *Item) Priority() int64 { + return i.Options.Priority } // Body packs job payload into binary payload. -func (j *Item) Body() []byte { - return utils.AsBytes(j.Payload) +func (i *Item) Body() []byte { + return utils.AsBytes(i.Payload) } // Context packs job context (job, id) into binary payload. // Not used in the amqp, amqp.Table used instead -func (j *Item) Context() ([]byte, error) { +func (i *Item) Context() ([]byte, error) { ctx, err := json.Marshal( struct { ID string `json:"id"` @@ -93,7 +93,7 @@ func (j *Item) Context() ([]byte, error) { Headers map[string][]string `json:"headers"` Timeout int64 `json:"timeout"` Pipeline string `json:"pipeline"` - }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline}, + }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline}, ) if err != nil { @@ -103,14 +103,20 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() error { - return j.AckFunc(j.Options.multipleAsk) +func (i *Item) Ack() error { + return i.AckFunc(i.Options.multipleAsk) } -func (j *Item) Nack() error { - return j.NackFunc(false, j.Options.requeue) +func (i *Item) Nack() error { + return i.NackFunc(false, i.Options.requeue) } +// Requeue with the provided delay, handled by the Nack +func (i *Item) Requeue(_ uint32) error { + return nil +} + +// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") item, err := j.unpack(d) @@ -118,11 +124,12 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { return nil, errors.E(op, err) } return &Item{ - Job: item.Job, - Ident: item.Ident, - Payload: item.Payload, - Headers: item.Headers, - Options: item.Options, + Job: item.Job, + Ident: item.Ident, + Payload: item.Payload, + Headers: item.Headers, + Options: item.Options, + // internal AckFunc: d.Ack, NackFunc: d.Nack, }, nil |