diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/rabbit.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/rabbit.go | 35 |
1 files changed, 10 insertions, 25 deletions
diff --git a/plugins/jobs/brokers/amqp/rabbit.go b/plugins/jobs/brokers/amqp/rabbit.go index 41374878..7e722889 100644 --- a/plugins/jobs/brokers/amqp/rabbit.go +++ b/plugins/jobs/brokers/amqp/rabbit.go @@ -1,24 +1,23 @@ package amqp import ( - "fmt" - - "github.com/google/uuid" + "github.com/spiral/errors" "github.com/streadway/amqp" ) -func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("rabbit_initmq") // Channel opens a unique, concurrent server channel to process the bulk of AMQP // messages. Any error from methods on this receiver will render the receiver // invalid and a new Channel should be opened. channel, err := j.conn.Channel() if err != nil { - return nil, err + return errors.E(op, err) } err = channel.Qos(j.prefetchCount, 0, false) if err != nil { - return nil, err + return errors.E(op, err) } // declare an exchange (idempotent operation) @@ -32,12 +31,12 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // verify or declare a queue q, err := channel.QueueDeclare( - fmt.Sprintf("%s.%s", j.routingKey, uuid.NewString()), + j.queue, false, false, true, @@ -45,7 +44,7 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err + return errors.E(op, err) } // bind queue to the exchange @@ -57,24 +56,10 @@ func (j *JobsConsumer) initRabbitMQ() (<-chan amqp.Delivery, error) { nil, ) if err != nil { - return nil, err - } - - // start reading messages from the channel - deliv, err := channel.Consume( - q.Name, - "", - false, - false, - false, - false, - nil, - ) - if err != nil { - return nil, err + return errors.E(op, err) } - return deliv, nil + return nil } func (j *JobsConsumer) listener(deliv <-chan amqp.Delivery) { |