summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go')
-rw-r--r--plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go57
1 files changed, 57 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
new file mode 100644
index 00000000..e260fabe
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/amqpjobs/rabbit_init.go
@@ -0,0 +1,57 @@
+package amqpjobs
+
+import (
+ "github.com/spiral/errors"
+)
+
+func (j *consumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ j.exchangeName,
+ j.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ j.queue,
+ false,
+ false,
+ j.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ j.routingKey,
+ j.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return channel.Close()
+}