summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/amqp/amqpjobs/item.go')
-rw-r--r--plugins/amqp/amqpjobs/item.go18
1 files changed, 9 insertions, 9 deletions
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go
index a8e305ea..66b70a36 100644
--- a/plugins/amqp/amqpjobs/item.go
+++ b/plugins/amqp/amqpjobs/item.go
@@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
-func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
+func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- item, err := j.unpack(d)
+ item, err := c.unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
@@ -156,10 +156,10 @@ func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- item.Options.delayed = j.delayed
+ item.Options.delayed = c.delayed
// requeue func
- item.Options.requeueFn = j.handleItem
+ item.Options.requeueFn = c.handleItem
return i, nil
}
@@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
+func (c *consumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
- multipleAsk: j.multipleAck,
- requeue: j.requeueOnFail,
- requeueFn: j.handleItem,
+ multipleAsk: c.multipleAck,
+ requeue: c.requeueOnFail,
+ requeueFn: c.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {
@@ -230,7 +230,7 @@ func (j *consumer) unpack(d amqp.Delivery) (*Item, error) {
if _, ok := d.Headers[job.RRPriority]; !ok {
// set pipe's priority
- item.Options.Priority = j.priority
+ item.Options.Priority = c.priority
} else {
item.Options.Priority = d.Headers[job.RRPriority].(int64)
}