diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 2d0d591c..5b549874 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -65,11 +65,12 @@ type JobsConsumer struct { exchangeType string routingKey string - // TODO send data to channel + delayCache map[string]struct{} + stop chan struct{} } -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -78,8 +79,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, logger: log, pq: pq, consumeID: uuid.NewString(), - stop: make(chan struct{}), + stop: stopCh, retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), } // if no such key - error @@ -156,6 +158,10 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // convert msg := FromJob(job) + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // handle timeouts if job.Options.DelayDuration() > 0 { @@ -163,7 +169,28 @@ func (j *JobsConsumer) Push(job *structs.Job) error { delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + } + + _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ dlx: j.exchangeName, dlxRoutingKey: j.routingKey, dlxTTL: delayMs, @@ -179,10 +206,6 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: p, @@ -196,13 +219,11 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } + j.delayCache[tmpQ] = struct{}{} + return nil } - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: p, |