diff options
author | Valery Piashchynski <[email protected]> | 2021-08-31 15:31:30 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-08-31 15:31:30 +0300 |
commit | 83e7bc6afbc2e523a95cf9dcb8b25cf5f7ba3f1e (patch) | |
tree | 884dd2991acf12826752632b8321410e7cc923ce /plugins/amqp/amqpjobs/rabbit_init.go | |
parent | 0a66fae4196c5abab2fdf1400f0b200f8a307b31 (diff) | |
parent | 31cf040029eb0b26278e4a9948cbc1aba77ed58b (diff) |
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
#770: feat(`driver,jobs`): local persistent driver based on the `boltdb`, #772: fix(`worker_watcher`): bug with failed worker while TTL-ed
Diffstat (limited to 'plugins/amqp/amqpjobs/rabbit_init.go')
-rw-r--r-- | plugins/amqp/amqpjobs/rabbit_init.go | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/plugins/amqp/amqpjobs/rabbit_init.go b/plugins/amqp/amqpjobs/rabbit_init.go new file mode 100644 index 00000000..fb5f6911 --- /dev/null +++ b/plugins/amqp/amqpjobs/rabbit_init.go @@ -0,0 +1,57 @@ +package amqpjobs + +import ( + "github.com/spiral/errors" +) + +func (c *consumer) initRabbitMQ() error { + const op = errors.Op("jobs_plugin_rmq_init") + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := c.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + c.exchangeName, + c.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + c.queue, + false, + false, + c.exclusive, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + c.routingKey, + c.exchangeName, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + return channel.Close() +} |