summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-10 01:38:36 +0300
committerValery Piashchynski <[email protected]>2021-07-10 01:39:09 +0300
commitfa57fa609d14e4ebf4cbffc154804402906eecaa (patch)
tree22bb3ab2c1f9ea9bd8f54f76d471f9bfc556ff75 /plugins/jobs/brokers/amqp/consumer.go
parent242022e4a64acef9eda87fa1b8b3b026d693b332 (diff)
Properly parse amqp items (jobs).
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go15
1 files changed, 10 insertions, 5 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 9ac47269..b4e35d35 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -144,22 +144,27 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
func (j *JobsConsumer) Push(job *structs.Job) error {
const op = errors.Op("ephemeral_push")
+ // lock needed here to re-create a connections and channels in case of error
j.RLock()
defer j.RUnlock()
+ // convert
+ msg := FromJob(job)
+
// check if the pipeline registered
if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
// handle timeouts
if job.Options.DelayDuration() > 0 {
- // pub
+ // TODO declare separate method for this if condition
+
delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
_, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
dlx: j.exchangeName,
dlxRoutingKey: j.routingKey,
- dlxTTL: 100,
- dlxExpires: 200,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
})
if err != nil {
@@ -173,7 +178,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// insert to the local, limited pipeline
err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, job),
+ Headers: pack(job.Ident, 0, msg),
ContentType: contentType,
Timestamp: time.Now(),
Body: nil,
@@ -187,7 +192,7 @@ func (j *JobsConsumer) Push(job *structs.Job) error {
// insert to the local, limited pipeline
err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- //Headers: pack(job.Ident, 0, job),
+ Headers: pack(job.Ident, 0, msg),
ContentType: contentType,
Timestamp: time.Now(),
Body: nil,