diff options
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 45 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/item.go | 19 | ||||
-rw-r--r-- | plugins/jobs/brokers/amqp/plugin.go | 21 |
3 files changed, 63 insertions, 22 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 2d0d591c..5b549874 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -65,11 +65,12 @@ type JobsConsumer struct { exchangeType string routingKey string - // TODO send data to channel + delayCache map[string]struct{} + stop chan struct{} } -func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (jobs.Consumer, error) { +func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, stopCh chan struct{}, pq priorityqueue.Queue) (jobs.Consumer, error) { const op = errors.Op("new_amqp_consumer") // we need to obtain two parts of the amqp information here. // firs part - address to connect, it is located in the global section under the amqp pluginName @@ -78,8 +79,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, logger: log, pq: pq, consumeID: uuid.NewString(), - stop: make(chan struct{}), + stop: stopCh, retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), } // if no such key - error @@ -156,6 +158,10 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // convert msg := FromJob(job) + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // handle timeouts if job.Options.DelayDuration() > 0 { @@ -163,7 +169,28 @@ func (j *JobsConsumer) Push(job *structs.Job) error { delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { + // insert to the local, limited pipeline + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + } + + _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ dlx: j.exchangeName, dlxRoutingKey: j.routingKey, dlxTTL: delayMs, @@ -179,10 +206,6 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: p, @@ -196,13 +219,11 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } + j.delayCache[tmpQ] = struct{}{} + return nil } - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: p, diff --git a/plugins/jobs/brokers/amqp/item.go b/plugins/jobs/brokers/amqp/item.go index 731e6a2b..2e8a30af 100644 --- a/plugins/jobs/brokers/amqp/item.go +++ b/plugins/jobs/brokers/amqp/item.go @@ -1,7 +1,6 @@ package amqp import ( - "fmt" "time" json "github.com/json-iterator/go" @@ -23,13 +22,13 @@ const ( func FromDelivery(d amqp.Delivery) (*Item, error) { const op = errors.Op("from_delivery_convert") - id, item, err := unpack(d) + item, err := unpack(d) if err != nil { return nil, errors.E(op, err) } return &Item{ Job: item.Job, - Ident: id, + Ident: item.Ident, Payload: item.Payload, Headers: item.Headers, Options: item.Options, @@ -173,15 +172,17 @@ func pack(id string, j *Item) (amqp.Table, error) { } // unpack restores jobs.Options -func unpack(d amqp.Delivery) (id string, j *Item, err error) { - j = &Item{Payload: utils.AsString(d.Body), Options: &Options{}} +func unpack(d amqp.Delivery) (*Item, error) { + j := &Item{Payload: utils.AsString(d.Body), Options: &Options{}} if _, ok := d.Headers[rrID].(string); !ok { - return "", nil, fmt.Errorf("missing header `%s`", rrID) + return nil, errors.E(errors.Errorf("missing header `%s`", rrID)) } + j.Ident = d.Headers[rrID].(string) + if _, ok := d.Headers[rrJob].(string); !ok { - return "", nil, fmt.Errorf("missing header `%s`", rrJob) + return nil, errors.E(errors.Errorf("missing header `%s`", rrJob)) } j.Job = d.Headers[rrJob].(string) @@ -193,7 +194,7 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { if h, ok := d.Headers[rrHeaders].([]byte); ok { err := json.Unmarshal(h, &j.Headers) if err != nil { - return "", nil, err + return nil, err } } @@ -209,5 +210,5 @@ func unpack(d amqp.Delivery) (id string, j *Item, err error) { j.Options.RetryDelay = d.Headers[rrRetryDelay].(int32) } - return d.Headers[rrID].(string), j, nil + return j, nil } diff --git a/plugins/jobs/brokers/amqp/plugin.go b/plugins/jobs/brokers/amqp/plugin.go index 377d8648..7b6562c7 100644 --- a/plugins/jobs/brokers/amqp/plugin.go +++ b/plugins/jobs/brokers/amqp/plugin.go @@ -1,6 +1,8 @@ package amqp import ( + "sync/atomic" + "github.com/spiral/roadrunner/v2/common/jobs" priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" @@ -14,11 +16,27 @@ const ( type Plugin struct { log logger.Logger cfg config.Configurer + + numConsumers uint32 + stopCh chan struct{} } func (p *Plugin) Init(log logger.Logger, cfg config.Configurer) error { p.log = log p.cfg = cfg + p.stopCh = make(chan struct{}) + return nil +} + +func (p *Plugin) Serve() chan error { + return make(chan error) +} + +func (p *Plugin) Stop() error { + // send stop to the all consumers delivery + for i := uint32(0); i < atomic.LoadUint32(&p.numConsumers); i++ { + p.stopCh <- struct{}{} + } return nil } @@ -29,5 +47,6 @@ func (p *Plugin) Name() string { func (p *Plugin) Available() {} func (p *Plugin) JobsConstruct(configKey string, pq priorityqueue.Queue) (jobs.Consumer, error) { - return NewAMQPConsumer(configKey, p.log, p.cfg, pq) + atomic.AddUint32(&p.numConsumers, 1) + return NewAMQPConsumer(configKey, p.log, p.cfg, p.stopCh, pq) } |