diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 31 |
1 files changed, 15 insertions, 16 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index ddb4e291..7f1bf204 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -4,23 +4,17 @@ import ( "time" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/plugins/jobs/structs" "github.com/spiral/roadrunner/v2/utils" + "github.com/streadway/amqp" ) -func From(job *structs.Job) *Item { +func From(d amqp.Delivery) *Item { return &Item{ - Job: job.Job, - Ident: job.Ident, - Payload: job.Payload, - Options: conv(*job.Options), + AckFunc: d.Ack, + NackFunc: d.Nack, } } -func conv(jo structs.Options) Options { - return Options(jo) -} - type Item struct { // Job contains name of job broker (usually PHP class). Job string `json:"job"` @@ -37,9 +31,14 @@ type Item struct { // Options contains set of PipelineOptions specific to job execution. Can be empty. Options Options `json:"options,omitempty"` - AckFunc func() + // Ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery + AckFunc func(multiply bool) error - NackFunc func() + // Nack negatively acknowledge the delivery of message(s) identified by the delivery tag from either the client or server. + // When multiple is true, nack messages up to and including delivered messages up until the delivery tag delivered on the same channel. + // When requeue is true, request the server to deliver this message to a different consumer. If it is not possible or requeue is false, the message will be dropped or delivered to a server configured dead-letter queue. + // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time + NackFunc func(multiply bool, requeue bool) error } // Options carry information about how to handle given job. @@ -121,10 +120,10 @@ func (j *Item) Context() ([]byte, error) { return ctx, nil } -func (j *Item) Ack() { - // noop for the in-memory +func (j *Item) Ack() error { + return j.AckFunc(false) } -func (j *Item) Nack() { - // noop for the in-memory +func (j *Item) Nack() error { + return j.NackFunc(false, false) } |