summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go118
1 files changed, 68 insertions, 50 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 3ca5c742..36a16bcd 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -54,6 +54,7 @@ type JobConsumer struct {
listeners uint32
stopCh chan struct{}
+ requeueCh chan *Item
}
// NewAMQPConsumer initializes rabbitmq pipeline
@@ -111,6 +112,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
exclusive: pipeCfg.Exclusive,
multipleAck: pipeCfg.MultipleAck,
requeueOnFail: pipeCfg.RequeueOnFail,
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -133,8 +135,9 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
jb.publishChan <- pch
- // run redialer for the connection
+ // run redialer and requeue listener for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -181,6 +184,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
exclusive: pipeline.Bool(exclusive, true),
multipleAck: pipeline.Bool(multipleAsk, false),
requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ requeueCh: make(chan *Item, 1000),
}
jb.conn, err = amqp.Dial(globalCfg.Addr)
@@ -209,6 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// run redialer for the connection
jb.redialer()
+ jb.requeueListener()
return jb, nil
}
@@ -240,52 +245,35 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
- // insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
- ContentType: contentType,
- Timestamp: time.Now().UTC(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
-
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- }
-
- _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
+ err = j.handleItem(msg, p, pch)
+ if err != nil {
+ return errors.E(op, err)
+ }
- if err != nil {
- return errors.E(op, err)
- }
+ return nil
- err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
- }
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
+ }
+}
+// handleItem
+func (j *JobConsumer) handleItem(msg *Item, table amqp.Table, pch *amqp.Channel) error {
+ const op = errors.Op("amqp_handle_item")
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: p,
+ err := pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
@@ -296,28 +284,56 @@ func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
}
+ _, err := pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
// insert to the local, limited pipeline
- err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: p,
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: table,
ContentType: contentType,
- Timestamp: time.Now(),
+ Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
+
if err != nil {
return errors.E(op, err)
}
+ j.delayCache[tmpQ] = struct{}{}
+
return nil
+ }
- case <-ctx.Done():
- return errors.E(op, errors.TimeOut, ctx.Err())
+ // insert to the local, limited pipeline
+ err := pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: table,
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
}
+
+ return nil
}
func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
@@ -475,6 +491,8 @@ func (j *JobConsumer) Resume(_ context.Context, p string) {
func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
+ close(j.requeueCh)
+
pipe := j.pipeline.Load().(*pipeline.Pipeline)
j.eh.Push(events.JobEvent{
Event: events.EventPipeStopped,