summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
committerValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
commit240b114e1ea3c1414bcd9f4d2c050d56c467222f (patch)
tree3c6a9e29c183492e6925488c24b9f65ca9c83fc7
parent510e19376df7882491e123cbfd2790a04ba31147 (diff)
Dead letter exchange optimization. Cache for the DLX queues. If the
queue had been declared and binded to the particular timeout, we can avoid re-declaring the queue. This optimization increases RPS to the DLX from 3.5k to 35k. Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go45
-rw-r--r--plugins/jobs/brokers/amqp/item.go19
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go21
-rw-r--r--tests/plugins/jobs/configs/.rr-jobs-init.yaml1
4 files changed, 63 insertions, 23 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 2d0d591c..5b549874 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -65,11 +65,12 @@ type JobsConsumer struct {
exchangeType string
routingKey string
- // TODO send data to channel
+ delayCache map[string]struct{}
+
stop chan struct{}
}
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -78,8 +79,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
logger: log,
pq: pq,
consumeID: uuid.NewString(),
- stop: make(chan struct{}),
+ stop: stopCh,
retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
}
// if no such key - error
@@ -156,6 +158,10 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// convert
msg := FromJob(job)
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// handle timeouts
if job.Options.DelayDuration() > 0 {
@@ -163,7 +169,28 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
dlx: j.exchangeName,
dlxRoutingKey: j.routingKey,
dlxTTL: delayMs,
@@ -179,10 +206,6 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: p,
@@ -196,13 +219,11 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: p,
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 731e6a2b..2e8a30af 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -1,7 +1,6 @@
package amqp
import (
- "fmt"
"time"
json "github.com/json-iterator/go"
@@ -23,13 +22,13 @@ const (
func FromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- id, item, err := unpack(d)
+ item, err := unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
return &Item{
Job: item.Job,
- Ident: id,
+ Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
Options: item.Options,
@@ -173,15 +172,17 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, j *Item, err error) {
- j = &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+func unpack(d amqp.Delivery) (*Item, error) {
+ j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
if _, ok := d.Headers[rrID].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrID)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrID))
}
+ j.Ident = d.Headers[rrID].(string)
+
if _, ok := d.Headers[rrJob].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrJob)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrJob))
}
j.Job = d.Headers[rrJob].(string)
@@ -193,7 +194,7 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
if h, ok := d.Headers[rrHeaders].([]byte); ok {
err := json.Unmarshal(h, &j.Headers)
if err != nil {
- return "", nil, err
+ return nil, err
}
}
@@ -209,5 +210,5 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
- return d.Headers[rrID].(string), j, nil
+ return j, nil
}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 377d8648..7b6562c7 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "sync/atomic"
+
"github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -14,11 +16,27 @@ const (
type Plugin struct {
log logger.Logger
cfg config.Configurer
+
+ numConsumers uint32
+ stopCh chan struct{}
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
p.cfg = cfg
+ p.stopCh = make(chan struct{})
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ // send stop to the all consumers delivery
+ for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ {
+ p.stopCh <- struct{}{}
+ }
return nil
}
@@ -29,5 +47,6 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
+ atomic.AddUint32(&p.numConsumers, 1)
+ return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq)
}
diff --git a/tests/plugins/jobs/configs/.rr-jobs-init.yaml b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
index 93c978c2..6efc760b 100644
--- a/tests/plugins/jobs/configs/.rr-jobs-init.yaml
+++ b/tests/plugins/jobs/configs/.rr-jobs-init.yaml
@@ -9,7 +9,6 @@ server:
amqp:
addr: amqp://guest:guest@localhost:5672/
-
# beanstalk configuration
beanstalk:
addr: tcp://localhost:11300