summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
committerValery Piashchynski <[email protected]>2021-07-11 11:50:50 +0300
commit240b114e1ea3c1414bcd9f4d2c050d56c467222f (patch)
tree3c6a9e29c183492e6925488c24b9f65ca9c83fc7 /plugins/jobs/brokers/amqp/plugin.go
parent510e19376df7882491e123cbfd2790a04ba31147 (diff)
Dead letter exchange optimization. Cache for the DLX queues. If the
queue had been declared and binded to the particular timeout, we can avoid re-declaring the queue. This optimization increases RPS to the DLX from 3.5k to 35k. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/plugin.go')
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go21
1 files changed, 20 insertions, 1 deletions
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 377d8648..7b6562c7 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "sync/atomic"
+
"github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -14,11 +16,27 @@ const (
type Plugin struct {
log logger.Logger
cfg config.Configurer
+
+ numConsumers uint32
+ stopCh chan struct{}
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
p.cfg = cfg
+ p.stopCh = make(chan struct{})
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ // send stop to the all consumers delivery
+ for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ {
+ p.stopCh <- struct{}{}
+ }
return nil
}
@@ -29,5 +47,6 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
+ atomic.AddUint32(&p.numConsumers, 1)
+ return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq)
}