diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 4a85ed01..2d0d591c 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/common/jobs" - "github.com/spiral/roadrunner/v2/pkg/priorityqueue" + priorityqueue "github.com/spiral/roadrunner/v2/pkg/priority_queue" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/jobs/structs" @@ -128,12 +128,12 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, err = jb.initRabbitMQ() if err != nil { - return nil, err + return nil, errors.E(op, err) } jb.publishChan, err = jb.conn.Channel() if err != nil { - panic(err) + return nil, errors.E(op, err) } // run redialer for the connection @@ -151,8 +151,8 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // lock needed here to protect redial concurrent operation // we may be in the redial state here - j.RLock() - defer j.RUnlock() + j.Lock() + defer j.Unlock() // convert msg := FromJob(job) @@ -179,9 +179,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return errors.E(op, err) } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, @@ -195,9 +199,13 @@ func (j *JobsConsumer) Push(job *structs.Job) error { return nil } + p, err := pack(job.Ident, msg) + if err != nil { + return errors.E(op, err) + } // insert to the local, limited pipeline - err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: pack(job.Ident, msg), + err = j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: p, ContentType: contentType, Timestamp: time.Now(), DeliveryMode: amqp.Persistent, |