summaryrefslogtreecommitdiff
path: root/plugins/amqp/amqpjobs/rabbit_init.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-31 15:31:30 +0300
committerGitHub <[email protected]>2021-08-31 15:31:30 +0300
commit83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch)
tree884dd2991acf12826752632b8321410e7cc923ce /plugins/amqp/amqpjobs/rabbit_init.go
parent0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff)
parent31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff)
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/amqp/amqpjobs/rabbit_init.go')
-rw-r--r--plugins/amqp/amqpjobs/rabbit_init.go57
1 files changed, 57 insertions, 0 deletions
diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go
new file mode 100644
index 00000000..fb5f6911
--- /dev/null
+++ b/plugins/amqp/amqpjobs/rabbit_init.go
@@ -0,0 +1,57 @@
+package amqpjobs
+
+import (
+ "github.com/spiral/errors"
+)
+
+func (c *consumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := c.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ c.exchangeName,
+ c.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ c.queue,
+ false,
+ false,
+ c.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ c.routingKey,
+ c.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return channel.Close()
+}