diff options
author | Valery Piashchynski <[email protected]> | 2021-07-10 01:38:36 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-10 01:39:09 +0300 |
commit | fa57fa609d14e4ebf4cbffc154804402906eecaa (patch) | |
tree | 22bb3ab2c1f9ea9bd8f54f76d471f9bfc556ff75 /plugins/jobs/brokers/amqp/consumer.go | |
parent | 242022e4a64acef9eda87fa1b8b3b026d693b332 (diff) |
Properly parse amqp items (jobs).
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 9ac47269..b4e35d35 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -144,22 +144,27 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, func (j *JobsConsumer) Push(job *structs.Job) error { const op = errors.Op("ephemeral_push") + // lock needed here to re-create a connections and channels in case of error j.RLock() defer j.RUnlock() + // convert + msg := FromJob(job) + // check if the pipeline registered if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { // handle timeouts if job.Options.DelayDuration() > 0 { - // pub + // TODO declare separate method for this if condition + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ dlx: j.exchangeName, dlxRoutingKey: j.routingKey, - dlxTTL: 100, - dlxExpires: 200, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, }) if err != nil { @@ -173,7 +178,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // insert to the local, limited pipeline err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, 0, job), + Headers: pack(job.Ident, 0, msg), ContentType: contentType, Timestamp: time.Now(), Body: nil, @@ -187,7 +192,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error { // insert to the local, limited pipeline err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - //Headers: pack(job.Ident, 0, job), + Headers: pack(job.Ident, 0, msg), ContentType: contentType, Timestamp: time.Now(), Body: nil, |