summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r--plugins/jobs/drivers/amqp/item.go41
1 files changed, 24 insertions, 17 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b544620..a7e2c4e5 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -70,22 +70,22 @@ func (o *Options) TimeoutDuration() time.Duration {
return time.Second * time.Duration(o.Timeout)
}
-func (j *Item) ID() string {
- return j.Ident
+func (i *Item) ID() string {
+ return i.Ident
}
-func (j *Item) Priority() int64 {
- return j.Options.Priority
+func (i *Item) Priority() int64 {
+ return i.Options.Priority
}
// Body packs job payload into binary payload.
-func (j *Item) Body() []byte {
- return utils.AsBytes(j.Payload)
+func (i *Item) Body() []byte {
+ return utils.AsBytes(i.Payload)
}
// Context packs job context (job, id) into binary payload.
// Not used in the amqp, amqp.Table used instead
-func (j *Item) Context() ([]byte, error) {
+func (i *Item) Context() ([]byte, error) {
ctx, err := json.Marshal(
struct {
ID string `json:"id"`
@@ -93,7 +93,7 @@ func (j *Item) Context() ([]byte, error) {
Headers map[string][]string `json:"headers"`
Timeout int64 `json:"timeout"`
Pipeline string `json:"pipeline"`
- }{ID: j.Ident, Job: j.Job, Headers: j.Headers, Timeout: j.Options.Timeout, Pipeline: j.Options.Pipeline},
+ }{ID: i.Ident, Job: i.Job, Headers: i.Headers, Timeout: i.Options.Timeout, Pipeline: i.Options.Pipeline},
)
if err != nil {
@@ -103,14 +103,20 @@ func (j *Item) Context() ([]byte, error) {
return ctx, nil
}
-func (j *Item) Ack() error {
- return j.AckFunc(j.Options.multipleAsk)
+func (i *Item) Ack() error {
+ return i.AckFunc(i.Options.multipleAsk)
}
-func (j *Item) Nack() error {
- return j.NackFunc(false, j.Options.requeue)
+func (i *Item) Nack() error {
+ return i.NackFunc(false, i.Options.requeue)
}
+// Requeue with the provided delay, handled by the Nack
+func (i *Item) Requeue(_ uint32) error {
+ return nil
+}
+
+// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
item, err := j.unpack(d)
@@ -118,11 +124,12 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
return nil, errors.E(op, err)
}
return &Item{
- Job: item.Job,
- Ident: item.Ident,
- Payload: item.Payload,
- Headers: item.Headers,
- Options: item.Options,
+ Job: item.Job,
+ Ident: item.Ident,
+ Payload: item.Payload,
+ Headers: item.Headers,
+ Options: item.Options,
+ // internal
AckFunc: d.Ack,
NackFunc: d.Nack,
}, nil