diff options
Diffstat (limited to 'plugins/amqp/amqpjobs/item.go')
-rw-r--r-- | plugins/amqp/amqpjobs/item.go | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/plugins/amqp/amqpjobs/item.go b/plugins/amqp/amqpjobs/item.go index a8e305ea..66b70a36 100644 --- a/plugins/amqp/amqpjobs/item.go +++ b/plugins/amqp/amqpjobs/item.go @@ -139,9 +139,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ -func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { +func (c *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - item, err := j.unpack(d) + item, err := c.unpack(d) if err != nil { return nil, errors.E(op, err) } @@ -156,10 +156,10 @@ func (j *consumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - item.Options.delayed = j.delayed + item.Options.delayed = c.delayed // requeue func - item.Options.requeueFn = j.handleItem + item.Options.requeueFn = c.handleItem return i, nil } @@ -194,11 +194,11 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { +func (c *consumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ - multipleAsk: j.multipleAck, - requeue: j.requeueOnFail, - requeueFn: j.handleItem, + multipleAsk: c.multipleAck, + requeue: c.requeueOnFail, + requeueFn: c.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { @@ -230,7 +230,7 @@ func (j *consumer) unpack(d amqp.Delivery) (*Item, error) { if _, ok := d.Headers[job.RRPriority]; !ok { // set pipe's priority - item.Options.Priority = j.priority + item.Options.Priority = c.priority } else { item.Options.Priority = d.Headers[job.RRPriority].(int64) } |