summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go24
1 files changed, 16 insertions, 8 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 4a85ed01..2d0d591c 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -8,7 +8,7 @@ import (
"github.com/google/uuid"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/common/jobs"
- "github.com/spiral/roadrunner/v2/pkg/priorityqueue"
+ priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/jobs/structs"
@@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
err = jb.initRabbitMQ()
if err != nil {
- return nil, err
+ return nil, errors.E(op, err)
}
jb.publishChan, err = jb.conn.Channel()
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
// run redialer for the connection
@@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// lock needed here to protect redial concurrent operation
// we may be in the redial state here
- j.RLock()
- defer j.RUnlock()
+ j.Lock()
+ defer j.Unlock()
// convert
msg := FromJob(job)
@@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return errors.E(op, err)
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,
@@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
return nil
}
+ p, err := pack(job.Ident, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
// insert to the local, limited pipeline
- err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: pack(job.Ident, msg),
+ err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: p,
ContentType: contentType,
Timestamp: time.Now(),
DeliveryMode: amqp.Persistent,