summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go45
1 files changed, 33 insertions, 12 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 2d0d591c..5b549874 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -65,11 +65,12 @@ type JobsConsumer struct {
exchangeType string
routingKey string
- // TODO send data to channel
+ delayCache map[string]struct{}
+
stop chan struct{}
}
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -78,8 +79,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
logger: log,
pq: pq,
consumeID: uuid.NewString(),
- stop: make(chan struct{}),
+ stop: stopCh,
retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
}
// if no such key - error
@@ -156,6 +158,10 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// convert
msg := FromJob(job)
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// handle timeouts
if job.Options.DelayDuration() > 0 {
@@ -163,7 +169,28 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
dlx: j.exchangeName,
dlxRoutingKey: j.routingKey,
dlxTTL: delayMs,
@@ -179,10 +206,6 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: p,
@@ -196,13 +219,11 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: p,