diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 118 |
1 files changed, 68 insertions, 50 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index 3ca5c742..36a16bcd 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -54,6 +54,7 @@ type JobConsumer struct { listeners uint32 stopCh chan struct{} + requeueCh chan *Item } // NewAMQPConsumer initializes rabbitmq pipeline @@ -111,6 +112,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, exclusive: pipeCfg.Exclusive, multipleAck: pipeCfg.MultipleAck, requeueOnFail: pipeCfg.RequeueOnFail, + requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -133,8 +135,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, jb.publishChan <- pch - // run redialer for the connection + // run redialer and requeue listener for the connection jb.redialer() + jb.requeueListener() return jb, nil } @@ -181,6 +184,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con exclusive: pipeline.Bool(exclusive, true), multipleAck: pipeline.Bool(multipleAsk, false), requeueOnFail: pipeline.Bool(requeueOnFail, false), + requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -209,6 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // run redialer for the connection jb.redialer() + jb.requeueListener() return jb, nil } @@ -240,52 +245,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return errors.E(op, err) } - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - // delay cache optimization. - // If user already declared a queue with a delay, do not redeclare and rebind the queue - // Before -> 2.5k RPS with redeclaration - // After -> 30k RPS - if _, exists := j.delayCache[tmpQ]; exists { - // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: p, - ContentType: contentType, - Timestamp: time.Now().UTC(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil - } - - _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) + err = j.handleItem(msg, p, pch) + if err != nil { + return errors.E(op, err) + } - if err != nil { - return errors.E(op, err) - } + return nil - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) - } + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) + } +} +// handleItem +func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error { + const op = errors.Op("amqp_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + // delay cache optimization. + // If user already declared a queue with a delay, do not redeclare and rebind the queue + // Before -> 2.5k RPS with redeclaration + // After -> 30k RPS + if _, exists := j.delayCache[tmpQ]; exists { // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: p, + err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), DeliveryMode: amqp.Persistent, @@ -296,28 +284,56 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return errors.E(op, err) } - j.delayCache[tmpQ] = struct{}{} - return nil } + _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + return errors.E(op, err) + } + + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } + // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: p, + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: table, ContentType: contentType, - Timestamp: time.Now(), + Timestamp: time.Now().UTC(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) + if err != nil { return errors.E(op, err) } + j.delayCache[tmpQ] = struct{}{} + return nil + } - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) + // insert to the local, limited pipeline + err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: table, + ContentType: contentType, + Timestamp: time.Now(), + DeliveryMode: amqp.Persistent, + Body: msg.Body(), + }) + + if err != nil { + return errors.E(op, err) } + + return nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -475,6 +491,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { func (j *JobConsumer) Stop(context.Context) error { j.stopCh <- struct{}{} + close(j.requeueCh) + pipe := j.pipeline.Load().(*pipeline.Pipeline) j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, |