summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/item.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/item.go')
-rw-r--r--plugins/jobs/drivers/amqp/item.go37
1 files changed, 28 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 908dbd15..f252acd8 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "context"
+ "fmt"
"time"
json "github.com/json-iterator/go"
@@ -52,7 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
- requeueCh chan *Item
+ requeueFn func(context.Context, *Item) error
multipleAsk bool
requeue bool
@@ -118,12 +120,28 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
- select {
- case i.Options.requeueCh <- i:
- return nil
- default:
- return errors.E("can't push to the requeue channel, channel either closed or full", "current size", len(i.Options.requeueCh))
+
+ err := i.Options.requeueFn(context.Background(), i)
+ if err != nil {
+ errAck := i.Options.nack(false, true)
+ if errAck != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ }
+
+ return err
}
+
+ // ack the job
+ err = i.Options.ack(false)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (i *Item) Recycle() {
+ i.Options = nil
}
// fromDelivery converts amqp.Delivery into an Item which will be pushed to the PQ
@@ -144,8 +162,9 @@ func (j *JobConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
item.Options.ack = d.Ack
item.Options.nack = d.Nack
- // requeue channel
- item.Options.requeueCh = j.requeueCh
+
+ // requeue func
+ item.Options.requeueFn = j.handleItem
return i, nil
}
@@ -186,7 +205,7 @@ func (j *JobConsumer) unpack(d amqp.Delivery) (*Item, error) {
item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
multipleAsk: j.multipleAck,
requeue: j.requeueOnFail,
- requeueCh: j.requeueCh,
+ requeueFn: j.handleItem,
}}
if _, ok := d.Headers[job.RRID].(string); !ok {