summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go45
-rw-r--r--plugins/jobs/brokers/amqp/item.go19
-rw-r--r--plugins/jobs/brokers/amqp/plugin.go21
3 files changed, 63 insertions, 22 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 2d0d591c..5b549874 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -65,11 +65,12 @@ type JobsConsumer struct {
exchangeType string
routingKey string
- // TODO send data to channel
+ delayCache map[string]struct{}
+
stop chan struct{}
}
-func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) {
+func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) {
const op = errors.Op("new_amqp_consumer")
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
@@ -78,8 +79,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
logger: log,
pq: pq,
consumeID: uuid.NewString(),
- stop: make(chan struct{}),
+ stop: stopCh,
retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
}
// if no such key - error
@@ -156,6 +158,10 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// convert
msg := FromJob(job)
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// handle timeouts
if job.Options.DelayDuration() > 0 {
@@ -163,7 +169,28 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
- _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
+ // insert to the local, limited pipeline
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
dlx: j.exchangeName,
dlxRoutingKey: j.routingKey,
dlxTTL: delayMs,
@@ -179,10 +206,6 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: p,
@@ -196,13 +219,11 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
}
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: p,
diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go
index 731e6a2b..2e8a30af 100644
--- a/plugins/jobs/brokers/amqp/item.go
+++ b/plugins/jobs/brokers/amqp/item.go
@@ -1,7 +1,6 @@
package amqp
import (
- "fmt"
"time"
json "github.com/json-iterator/go"
@@ -23,13 +22,13 @@ const (
func FromDelivery(d amqp.Delivery) (*Item, error) {
const op = errors.Op("from_delivery_convert")
- id, item, err := unpack(d)
+ item, err := unpack(d)
if err != nil {
return nil, errors.E(op, err)
}
return &Item{
Job: item.Job,
- Ident: id,
+ Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
Options: item.Options,
@@ -173,15 +172,17 @@ func pack(id string, j *Item) (amqp.Table, error) {
}
// unpack restores jobs.Options
-func unpack(d amqp.Delivery) (id string, j *Item, err error) {
- j = &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+func unpack(d amqp.Delivery) (*Item, error) {
+ j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
if _, ok := d.Headers[rrID].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrID)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrID))
}
+ j.Ident = d.Headers[rrID].(string)
+
if _, ok := d.Headers[rrJob].(string); !ok {
- return "", nil, fmt.Errorf("missing header `%s`", rrJob)
+ return nil, errors.E(errors.Errorf("missing header `%s`", rrJob))
}
j.Job = d.Headers[rrJob].(string)
@@ -193,7 +194,7 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
if h, ok := d.Headers[rrHeaders].([]byte); ok {
err := json.Unmarshal(h, &j.Headers)
if err != nil {
- return "", nil, err
+ return nil, err
}
}
@@ -209,5 +210,5 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) {
j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32)
}
- return d.Headers[rrID].(string), j, nil
+ return j, nil
}
diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go
index 377d8648..7b6562c7 100644
--- a/plugins/jobs/brokers/amqp/plugin.go
+++ b/plugins/jobs/brokers/amqp/plugin.go
@@ -1,6 +1,8 @@
package amqp
import (
+ "sync/atomic"
+
"github.com/spiral/roadrunner/v2/common/jobs"
priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -14,11 +16,27 @@ const (
type Plugin struct {
log logger.Logger
cfg config.Configurer
+
+ numConsumers uint32
+ stopCh chan struct{}
}
func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
p.log = log
p.cfg = cfg
+ p.stopCh = make(chan struct{})
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ // send stop to the all consumers delivery
+ for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ {
+ p.stopCh <- struct{}{}
+ }
return nil
}
@@ -29,5 +47,6 @@ func (p *Plugin) Name() string {
func (p *Plugin) Available() {}
func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) {
- return NewAMQPConsumer(configKey, p.log, p.cfg, pq)
+ atomic.AddUint32(&p.numConsumers, 1)
+ return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq)
}