summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/rabbit.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/rabbit.go')
-rw-r--r--plugins/jobs/brokers/amqp/rabbit.go35
1 files changed, 10 insertions, 25 deletions
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go
index 41374878..7e722889 100644
--- a/plugins/jobs/brokers/amqp/rabbit.go
+++ b/plugins/jobs/brokers/amqp/rabbit.go
@@ -1,24 +1,23 @@
package amqp
import (
- "fmt"
-
- "github.com/google/uuid"
+ "github.com/spiral/errors"
"github.com/streadway/amqp"
)
-func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
+func (j *JobsConsumer) initRabbitMQ() error {
+ const op = errors.Op("rabbit_initmq")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
channel, err := j.conn.Channel()
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
err = channel.Qos(j.prefetchCount, 0, false)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// declare an exchange (idempotent operation)
@@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// verify or declare a queue
q, err := channel.QueueDeclare(
- fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()),
+ j.queue,
false,
false,
true,
@@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
+ return errors.E(op, err)
}
// bind queue to the exchange
@@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) {
nil,
)
if err != nil {
- return nil, err
- }
-
- // start reading messages from the channel
- deliv, err := channel.Consume(
- q.Name,
- "",
- false,
- false,
- false,
- false,
- nil,
- )
- if err != nil {
- return nil, err
+ return errors.E(op, err)
}
- return deliv, nil
+ return nil
}
func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) {