summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go156
1 files changed, 89 insertions, 67 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 8c55399c..714a714a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -1,6 +1,7 @@
package amqp
import (
+ "context"
"fmt"
"sync"
"sync/atomic"
@@ -28,12 +29,18 @@ type JobConsumer struct {
// amqp connection
conn *amqp.Connection
consumeChan *amqp.Channel
- publishChan *amqp.Channel
+ publishChan chan *amqp.Channel
consumeID string
connStr string
- retryTimeout time.Duration
- prefetch int
+ retryTimeout time.Duration
+ //
+ // prefetch QoS AMQP
+ //
+ prefetch int
+ //
+ // pipeline's priority
+ //
priority int64
exchangeName string
queue string
@@ -95,6 +102,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
delayCache: make(map[string]struct{}, 100),
priority: pipeCfg.Priority,
+ publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeCfg.RoutingKey,
queue: pipeCfg.Queue,
exchangeType: pipeCfg.ExchangeType,
@@ -118,11 +126,13 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, err)
}
- jb.publishChan, err = jb.conn.Channel()
+ pch, err := jb.conn.Channel()
if err != nil {
return nil, errors.E(op, err)
}
+ jb.publishChan <- pch
+
// run redialer for the connection
jb.redialer()
@@ -161,6 +171,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
retryTimeout: time.Minute * 5,
delayCache: make(map[string]struct{}, 100),
+ publishChan: make(chan *amqp.Channel, 1),
routingKey: pipeline.String(routingKey, ""),
queue: pipeline.String(queue, "default"),
exchangeType: pipeline.String(exchangeType, "direct"),
@@ -185,14 +196,16 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return nil, errors.E(op, err)
}
- jb.publishChan, err = jb.conn.Channel()
+ pch, err := jb.conn.Channel()
if err != nil {
return nil, errors.E(op, err)
}
+ jb.publishChan <- pch
+
// register the pipeline
// error here is always nil
- _ = jb.Register(pipeline)
+ _ = jb.Register(context.Background(), pipeline)
// run redialer for the connection
jb.redialer()
@@ -200,7 +213,7 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
return jb, nil
}
-func (j *JobConsumer) Push(job *job.Job) error {
+func (j *JobConsumer) Push(ctx context.Context, job *job.Job) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered
@@ -212,32 +225,69 @@ func (j *JobConsumer) Push(job *job.Job) error {
// lock needed here to protect redial concurrent operation
// we may be in the redial state here
- j.Lock()
- defer j.Unlock()
- // convert
- msg := fromJob(job)
- p, err := pack(job.Ident, msg)
- if err != nil {
- return errors.E(op, err)
- }
+ select {
+ case pch := <-j.publishChan:
+ // return the channel back
+ defer func() {
+ j.publishChan <- pch
+ }()
+
+ // convert
+ msg := fromJob(job)
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // handle timeouts
+ if msg.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ // delay cache optimization.
+ // If user already declared a queue with a delay, do not redeclare and rebind the queue
+ // Before -> 2.5k RPS with redeclaration
+ // After -> 30k RPS
+ if _, exists := j.delayCache[tmpQ]; exists {
+ // insert to the local, limited pipeline
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: p,
+ ContentType: contentType,
+ Timestamp: time.Now().UTC(),
+ DeliveryMode: amqp.Persistent,
+ Body: msg.Body(),
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+ }
+
+ _, err = pch.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = pch.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
+ }
- // handle timeouts
- if msg.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
- delayMs := int64(msg.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- // delay cache optimization.
- // If user already declared a queue with a delay, do not redeclare and rebind the queue
- // Before -> 2.5k RPS with redeclaration
- // After -> 30k RPS
- if _, exists := j.delayCache[tmpQ]; exists {
// insert to the local, limited pipeline
- err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
Headers: p,
ContentType: contentType,
- Timestamp: time.Now(),
+ Timestamp: time.Now().UTC(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
@@ -246,64 +296,36 @@ func (j *JobConsumer) Push(job *job.Job) error {
return errors.E(op, err)
}
- return nil
- }
-
- _, err = j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
+ j.delayCache[tmpQ] = struct{}{}
- if err != nil {
- return errors.E(op, err)
- }
-
- err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- return errors.E(op, err)
+ return nil
}
// insert to the local, limited pipeline
- err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ err = pch.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
Body: msg.Body(),
})
-
if err != nil {
return errors.E(op, err)
}
- j.delayCache[tmpQ] = struct{}{}
-
return nil
- }
- // insert to the local, limited pipeline
- err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: p,
- ContentType: contentType,
- Timestamp: time.Now(),
- DeliveryMode: amqp.Persistent,
- Body: msg.Body(),
- })
- if err != nil {
- return errors.E(op, err)
+ case <-ctx.Done():
+ return errors.E(op, errors.TimeOut, ctx.Err())
}
-
- return nil
}
-func (j *JobConsumer) Register(pipeline *pipeline.Pipeline) error {
- j.pipeline.Store(pipeline)
+func (j *JobConsumer) Register(_ context.Context, p *pipeline.Pipeline) error {
+ j.pipeline.Store(p)
return nil
}
-func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
+func (j *JobConsumer) Run(ctx context.Context, p *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
pipe := j.pipeline.Load().(*pipeline.Pipeline)
@@ -353,7 +375,7 @@ func (j *JobConsumer) Run(p *pipeline.Pipeline) error {
return nil
}
-func (j *JobConsumer) Pause(p string) {
+func (j *JobConsumer) Pause(ctx context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested pause on: ", p)
@@ -391,7 +413,7 @@ func (j *JobConsumer) Pause(p string) {
})
}
-func (j *JobConsumer) Resume(p string) {
+func (j *JobConsumer) Resume(ctx context.Context, p string) {
pipe := j.pipeline.Load().(*pipeline.Pipeline)
if pipe.Name() != p {
j.log.Error("no such pipeline", "requested resume on: ", p)
@@ -450,7 +472,7 @@ func (j *JobConsumer) Resume(p string) {
})
}
-func (j *JobConsumer) Stop() error {
+func (j *JobConsumer) Stop(context.Context) error {
j.stopCh <- struct{}{}
pipe := j.pipeline.Load().(*pipeline.Pipeline)