diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 50 |
1 files changed, 32 insertions, 18 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index a7e2c4e5..1a7ce00e 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -25,15 +25,6 @@ type Item struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options *Options `json:"options,omitempty"` - - // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery - AckFunc func(multiply bool) error - - // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. - // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. - // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. - // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time - NackFunc func(multiply bool, requeue bool) error } // Options carry information about how to handle given job. @@ -52,6 +43,17 @@ type Options struct { Timeout int64 `json:"timeout,omitempty"` // private + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + ack func(multiply bool) error + + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + nack func(multiply bool, requeue bool) error + + requeueCh chan *Item + multipleAsk bool requeue bool } @@ -104,16 +106,23 @@ func (i *Item) Context() ([]byte, error) { } func (i *Item) Ack() error { - return i.AckFunc(i.Options.multipleAsk) + return i.Options.ack(i.Options.multipleAsk) } func (i *Item) Nack() error { - return i.NackFunc(false, i.Options.requeue) + return i.Options.nack(false, i.Options.requeue) } // Requeue with the provided delay, handled by the Nack -func (i *Item) Requeue(_ uint32) error { - return nil +func (i *Item) Requeue(delay int64) error { + // overwrite the delay + i.Options.Delay = delay + select { + case i.Options.requeueCh <- i: + return nil + default: + return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + } } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ @@ -123,16 +132,20 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { if err != nil { return nil, errors.E(op, err) } - return &Item{ + + i := &Item{ Job: item.Job, Ident: item.Ident, Payload: item.Payload, Headers: item.Headers, Options: item.Options, - // internal - AckFunc: d.Ack, - NackFunc: d.Nack, - }, nil + } + + item.Options.ack = d.Ack + item.Options.nack = d.Nack + // requeue channel + item.Options.requeueCh = j.requeueCh + return i, nil } func fromJob(job *job.Job) *Item { @@ -172,6 +185,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, + requeueCh: j.requeueCh, }} if _, ok := d.Headers[job.RRID].(string); !ok { |