diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 156 |
1 files changed, 89 insertions, 67 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 8c55399c..714a714a 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -1,6 +1,7 @@ package amqp import ( + "context" "fmt" "sync" "sync/atomic" @@ -28,12 +29,18 @@ type JobConsumer struct { // amqp connection conn *amqp.Connection consumeChan *amqp.Channel - publishChan *amqp.Channel + publishChan chan *amqp.Channel consumeID string connStr string - retryTimeout time.Duration - prefetch int + retryTimeout time.Duration + // + // prefetch QoS AMQP + // + prefetch int + // + // pipeline's priority + // priority int64 exchangeName string queue string @@ -95,6 +102,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, delayCache: make(map[string]struct{}, 100), priority: pipeCfg.Priority, + publishChan: make(chan *amqp.Channel, 1), routingKey: pipeCfg.RoutingKey, queue: pipeCfg.Queue, exchangeType: pipeCfg.ExchangeType, @@ -118,11 +126,13 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, err) } - jb.publishChan, err = jb.conn.Channel() + pch, err := jb.conn.Channel() if err != nil { return nil, errors.E(op, err) } + jb.publishChan <- pch + // run redialer for the connection jb.redialer() @@ -161,6 +171,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con retryTimeout: time.Minute * 5, delayCache: make(map[string]struct{}, 100), + publishChan: make(chan *amqp.Channel, 1), routingKey: pipeline.String(routingKey, ""), queue: pipeline.String(queue, "default"), exchangeType: pipeline.String(exchangeType, "direct"), @@ -185,14 +196,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return nil, errors.E(op, err) } - jb.publishChan, err = jb.conn.Channel() + pch, err := jb.conn.Channel() if err != nil { return nil, errors.E(op, err) } + jb.publishChan <- pch + // register the pipeline // error here is always nil - _ = jb.Register(pipeline) + _ = jb.Register(context.Background(), pipeline) // run redialer for the connection jb.redialer() @@ -200,7 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con return jb, nil } -func (j *JobConsumer) Push(job *job.Job) error { +func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { const op = errors.Op("rabbitmq_push") // check if the pipeline registered @@ -212,32 +225,69 @@ func (j *JobConsumer) Push(job *job.Job) error { // lock needed here to protect redial concurrent operation // we may be in the redial state here - j.Lock() - defer j.Unlock() - // convert - msg := fromJob(job) - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } + select { + case pch := <-j.publishChan: + // return the channel back + defer func() { + j.publishChan <- pch + }() + + // convert + msg := fromJob(job) + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } + + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { + // insert to the local, limited pipeline + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: p, + ContentType: contentType, + Timestamp: time.Now().UTC(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) + } + + return nil + } + + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + + if err != nil { + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - // delay cache optimization. - // If user already declared a queue with a delay, do not redeclare and rebind the queue - // Before -> 2.5k RPS with redeclaration - // After -> 30k RPS - if _, exists := j.delayCache[tmpQ]; exists { // insert to the local, limited pipeline - err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: p, ContentType: contentType, - Timestamp: time.Now(), + Timestamp: time.Now().UTC(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) @@ -246,64 +296,36 @@ func (j *JobConsumer) Push(job *job.Job) error { return errors.E(op, err) } - return nil - } - - _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) + j.delayCache[tmpQ] = struct{}{} - if err != nil { - return errors.E(op, err) - } - - err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) + return nil } // insert to the local, limited pipeline - err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) - if err != nil { return errors.E(op, err) } - j.delayCache[tmpQ] = struct{}{} - return nil - } - // insert to the local, limited pipeline - err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: p, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - if err != nil { - return errors.E(op, err) + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) } - - return nil } -func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error { - j.pipeline.Store(pipeline) +func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { + j.pipeline.Store(p) return nil } -func (j *JobConsumer) Run(p *pipeline.Pipeline) error { +func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") pipe := j.pipeline.Load().(*pipeline.Pipeline) @@ -353,7 +375,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error { return nil } -func (j *JobConsumer) Pause(p string) { +func (j *JobConsumer) Pause(ctx context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested pause on: ", p) @@ -391,7 +413,7 @@ func (j *JobConsumer) Pause(p string) { }) } -func (j *JobConsumer) Resume(p string) { +func (j *JobConsumer) Resume(ctx context.Context, p string) { pipe := j.pipeline.Load().(*pipeline.Pipeline) if pipe.Name() != p { j.log.Error("no such pipeline", "requested resume on: ", p) @@ -450,7 +472,7 @@ func (j *JobConsumer) Resume(p string) { }) } -func (j *JobConsumer) Stop() error { +func (j *JobConsumer) Stop(context.Context) error { j.stopCh <- struct{}{} pipe := j.pipeline.Load().(*pipeline.Pipeline) |