diff options
author | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-11 22:03:34 +0300 |
commit | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch) | |
tree | d796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/drivers/amqp/item.go | |
parent | e855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff) |
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index 908dbd15..f252acd8 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -1,6 +1,8 @@ package amqp import ( + "context" + "fmt" "time" json "github.com/json-iterator/go" @@ -52,7 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error - requeueCh chan *Item + requeueFn func(context.Context, *Item) error multipleAsk bool requeue bool @@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { // overwrite the delay i.Options.Delay = delay i.Headers = headers - select { - case i.Options.requeueCh <- i: - return nil - default: - return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh)) + + err := i.Options.requeueFn(context.Background(), i) + if err != nil { + errAck := i.Options.nack(false, true) + if errAck != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + } + + return err } + + // ack the job + err = i.Options.ack(false) + if err != nil { + return err + } + + return nil +} + +func (i *Item) Recycle() { + i.Options = nil } // fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ @@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) { item.Options.ack = d.Ack item.Options.nack = d.Nack - // requeue channel - item.Options.requeueCh = j.requeueCh + + // requeue func + item.Options.requeueFn = j.handleItem return i, nil } @@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) { item := &Item{Payload: utils.AsString(d.Body), Options: &Options{ multipleAsk: j.multipleAck, requeue: j.requeueOnFail, - requeueCh: j.requeueCh, + requeueFn: j.handleItem, }} if _, ok := d.Headers[job.RRID].(string); !ok { |