summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/item.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
committerValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
commit240b114e1ea3c1414bcd9f4d2c050d56c467222f (patch)
tree3c6a9e29c183492e6925488c24b9f65ca9c83fc7 /plugins/jobs/brokers/amqp/item.go
parent510e19376df7882491e123cbfd2790a04ba31147 (diff)
Dead letter exchange optimization. Cache for the DLX queues. If the
queue had been declared and binded to the particular timeout, we can avoid re-declaring the queue. This optimization increases RPS to the DLX from 3.5k to 35k. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r--plugins/jobs/brokers/amqp/item.go19
1 files changed, 10 insertions, 9 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 731e6a2b..2e8a30af 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -1,7 +1,6 @@
package amqp
import (
- "fmt"
"time"
json "github.com/json-iterator/go"
@@ -23,13 +22,13 @@ const (
func FromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- id, item, err := unpack(d)
+ item, err := unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
return &Item{
Job: item.Job,
- Ident: id,
+ Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
Options: item.Options,
@@ -173,15 +172,17 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, j *Item, err error) {
- j = &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+func unpack(d amqp.Delivery) (*Item, error) {
+ j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
if _, ok := d.Headers[rrID].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrID)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrID))
}
+ j.Ident = d.Headers[rrID].(string)
+
if _, ok := d.Headers[rrJob].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrJob)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrJob))
}
j.Job = d.Headers[rrJob].(string)
@@ -193,7 +194,7 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
if h, ok := d.Headers[rrHeaders].([]byte); ok {
err := json.Unmarshal(h, &j.Headers)
if err != nil {
- return "", nil, err
+ return nil, err
}
}
@@ -209,5 +210,5 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
- return d.Headers[rrID].(string), j, nil
+ return j, nil
}