diff options
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 108 |
1 files changed, 38 insertions, 70 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index d7425858..429953e1 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -54,7 +54,6 @@ type JobConsumer struct { listeners uint32 stopCh chan struct{} - requeueCh chan *Item } // NewAMQPConsumer initializes rabbitmq pipeline @@ -112,7 +111,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, exclusive: pipeCfg.Exclusive, multipleAck: pipeCfg.MultipleAck, requeueOnFail: pipeCfg.RequeueOnFail, - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -137,7 +135,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // run redialer and requeue listener for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -184,7 +181,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con exclusive: pipeline.Bool(exclusive, false), multipleAck: pipeline.Bool(multipleAsk, false), requeueOnFail: pipeline.Bool(requeueOnFail, false), - requeueCh: make(chan *Item, 1000), } jb.conn, err = amqp.Dial(globalCfg.Addr) @@ -213,7 +209,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con // run redialer for the connection jb.redialer() - jb.requeueListener() return jb, nil } @@ -228,9 +223,17 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Options.Pipeline, pipe.Name())) } - // lock needed here to protect redial concurrent operation - // we may be in the redial state here + err := j.handleItem(ctx, fromJob(job)) + if err != nil { + return errors.E(op, err) + } + + return nil +} +// handleItem +func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { + const op = errors.Op("rabbitmq_handle_item") select { case pch := <-j.publishChan: // return the channel back @@ -239,40 +242,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error { }() // convert - msg := fromJob(job) - p, err := pack(job.Ident, msg) - if err != nil { - return errors.E(op, err) - } - - err = j.handleItem(msg, p, pch) + table, err := pack(msg.ID(), msg) if err != nil { return errors.E(op, err) } - return nil + const op = errors.Op("amqp_handle_item") + // handle timeouts + if msg.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + // TODO dlx cache channel?? + delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + if err != nil { + return errors.E(op, err) + } - case <-ctx.Done(): - return errors.E(op, errors.TimeOut, ctx.Err()) - } -} + err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) + } -// handleItem -func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error { - const op = errors.Op("amqp_handle_item") - // handle timeouts - if msg.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - // delay cache optimization. - // If user already declared a queue with a delay, do not redeclare and rebind the queue - // Before -> 2.5k RPS with redeclaration - // After -> 30k RPS - if _, exists := j.delayCache[tmpQ]; exists { // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, Timestamp: time.Now().UTC(), @@ -284,29 +282,16 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - return nil - } - - _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - if err != nil { - return errors.E(op, err) - } + j.delayCache[tmpQ] = struct{}{} - err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - return errors.E(op, err) + return nil } // insert to the local, limited pipeline - err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ Headers: table, ContentType: contentType, - Timestamp: time.Now().UTC(), + Timestamp: time.Now(), DeliveryMode: amqp.Persistent, Body: msg.Body(), }) @@ -315,25 +300,10 @@ func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) return errors.E(op, err) } - j.delayCache[tmpQ] = struct{}{} - return nil + case <-ctx.Done(): + return errors.E(op, errors.TimeOut, ctx.Err()) } - - // insert to the local, limited pipeline - err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: table, - ContentType: contentType, - Timestamp: time.Now(), - DeliveryMode: amqp.Persistent, - Body: msg.Body(), - }) - - if err != nil { - return errors.E(op, err) - } - - return nil } func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error { @@ -491,8 +461,6 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { func (j *JobConsumer) Stop(context.Context) error { j.stopCh <- struct{}{} - close(j.requeueCh) - pipe := j.pipeline.Load().(*pipeline.Pipeline) j.eh.Push(events.JobEvent{ Event: events.EventPipeStopped, |