diff options
author | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-14 11:35:12 +0300 |
commit | d099e47ab28dd044d34e18347a4c714b8af3d612 (patch) | |
tree | e106e13bba48e435b87d218237b282d7f691b52c /plugins/jobs/drivers/amqp/rabbit_init.go | |
parent | ec7c049036d31fe030d106db9f0d268ea0296c5f (diff) |
SQS driver.
Fix isssues in the AMQP driver.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/rabbit_init.go')
-rw-r--r-- | plugins/jobs/drivers/amqp/rabbit_init.go | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/plugins/jobs/drivers/amqp/rabbit_init.go b/plugins/jobs/drivers/amqp/rabbit_init.go new file mode 100644 index 00000000..d6b8a708 --- /dev/null +++ b/plugins/jobs/drivers/amqp/rabbit_init.go @@ -0,0 +1,65 @@ +package amqp + +import ( + "time" + + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/events" +) + +func (j *JobsConsumer) initRabbitMQ() error { + const op = errors.Op("jobs_plugin_rmq_init") + // Channel opens a unique, concurrent server channel to process the bulk of AMQP + // messages. Any error from methods on this receiver will render the receiver + // invalid and a new Channel should be opened. + channel, err := j.conn.Channel() + if err != nil { + return errors.E(op, err) + } + + // declare an exchange (idempotent operation) + err = channel.ExchangeDeclare( + j.exchangeName, + j.exchangeType, + true, + false, + false, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // verify or declare a queue + q, err := channel.QueueDeclare( + j.queue, + false, + false, + j.exclusive, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + // bind queue to the exchange + err = channel.QueueBind( + q.Name, + j.routingKey, + j.exchangeName, + false, + nil, + ) + if err != nil { + return errors.E(op, err) + } + + j.eh.Push(events.JobEvent{ + Event: events.EventInitialized, + Driver: "amqp", + Start: time.Now(), + }) + return channel.Close() +} |