diff options
author | Valery Piashchynski <[email protected]> | 2021-07-11 11:50:50 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-11 11:50:50 +0300 |
commit | 240b114e1ea3c1414bcd9f4d2c050d56c467222f (patch) | |
tree | 3c6a9e29c183492e6925488c24b9f65ca9c83fc7 /plugins/jobs/brokers/amqp/item.go | |
parent | 510e19376df7882491e123cbfd2790a04ba31147 (diff) |
Dead letter exchange optimization. Cache for the DLX queues. If the
queue had been declared and binded to the particular timeout, we can
avoid re-declaring the queue. This optimization increases RPS to the DLX
from 3.5k to 35k.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/item.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 731e6a2b..2e8a30af 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -1,7 +1,6 @@ package amqp import ( - "fmt" "time" json "github.com/json-iterator/go" @@ -23,13 +22,13 @@ const ( func FromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - id, item, err := unpack(d) + item, err := unpack(d) if err != nil { return nil, errors.E(op, err) } return &Item{ Job: item.Job, - Ident: id, + Ident: item.Ident, Payload: item.Payload, Headers: item.Headers, Options: item.Options, @@ -173,15 +172,17 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, j *Item, err error) { - j = &Item{Payload: utils.AsString(d.Body), Options: &Options{}} +func unpack(d amqp.Delivery) (*Item, error) { + j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} if _, ok := d.Headers[rrID].(string); !ok { - return "", nil, fmt.Errorf("missing header `%s`", rrID) + return nil, errors.E(errors.Errorf("missing header `%s`", rrID)) } + j.Ident = d.Headers[rrID].(string) + if _, ok := d.Headers[rrJob].(string); !ok { - return "", nil, fmt.Errorf("missing header `%s`", rrJob) + return nil, errors.E(errors.Errorf("missing header `%s`", rrJob)) } j.Job = d.Headers[rrJob].(string) @@ -193,7 +194,7 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { if h, ok := d.Headers[rrHeaders].([]byte); ok { err := json.Unmarshal(h, &j.Headers) if err != nil { - return "", nil, err + return nil, err } } @@ -209,5 +210,5 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32) } - return d.Headers[rrID].(string), j, nil + return j, nil } |