summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/item.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/drivers/amqp/item.go
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r--plugins/jobs/drivers/amqp/item.go37
1 files changed, 28 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 908dbd15..f252acd8 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "context"
+ "fmt"
"time"
json "github.com/json-iterator/go"
@@ -52,7 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
multipleAsk bool
requeue bool
@@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errAck := i.Options.nack(false, true)
+ if errAck != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ }
+
+ return err
}
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- // requeue channel
- item.Options.requeueCh = j.requeueCh
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
return i, nil
}
@@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {