From 2d460062c97f9ad1e793831c54ae4d177dea83e8 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 11 Aug 2021 22:03:34 +0300 Subject: Durable requeue algo. Update AMQP and Beanstalk tests to use mock logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski --- plugins/jobs/drivers/amqp/item.go | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) (limited to 'plugins/jobs/drivers/amqp/item.go') diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 908dbd15..f252acd8 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -1,6 +1,8 @@ package amqp import ( + "context" + "fmt" "time" json "github.com/json-iterator/go" @@ -52,7 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error - requeueCh chan *Item + requeueFn func(context.Context, *Item) error multipleAsk bool requeue bool @@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + errAck := i.Options.nack(false, true) + if errAck != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + } + + return err } + + // ack the job + err = i.Options.ack(false) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ @@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - // requeue channel - item.Options.requeueCh = j.requeueCh + + // requeue func + item.Options.requeueFn = j.handleItem return i, nil } @@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { -- cgit v1.2.3