summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/rabbit_init.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
committerValery Piashchynski <[email protected]>2021-07-14 11:35:12 +0300
commitd099e47ab28dd044d34e18347a4c714b8af3d612 (patch)
treee106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/drivers/amqp/rabbit_init.go
parentec7c049036d31fe030d106db9f0d268ea0296c5f (diff)
SQS driver.
Fix isssues in the AMQP driver. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/rabbit_init.go')
-rw-r--r--plugins/jobs/drivers/amqp/rabbit_init.go65
1 files changed, 65 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go
new file mode 100644
index 00000000..d6b8a708
--- /dev/null
+++ b/plugins/jobs/drivers/amqp/rabbit_init.go
@@ -0,0 +1,65 @@
+package amqp
+
+import (
+ "time"
+
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/events"
+)
+
+func (j *JobsConsumer) initRabbitMQ() error {
+ const op = errors.Op("jobs_plugin_rmq_init")
+ // Channel opens a unique, concurrent server channel to process the bulk of AMQP
+ // messages. Any error from methods on this receiver will render the receiver
+ // invalid and a new Channel should be opened.
+ channel, err := j.conn.Channel()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // declare an exchange (idempotent operation)
+ err = channel.ExchangeDeclare(
+ j.exchangeName,
+ j.exchangeType,
+ true,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // verify or declare a queue
+ q, err := channel.QueueDeclare(
+ j.queue,
+ false,
+ false,
+ j.exclusive,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // bind queue to the exchange
+ err = channel.QueueBind(
+ q.Name,
+ j.routingKey,
+ j.exchangeName,
+ false,
+ nil,
+ )
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ j.eh.Push(events.JobEvent{
+ Event: events.EventInitialized,
+ Driver: "amqp",
+ Start: time.Now(),
+ })
+ return channel.Close()
+}