diff options
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 115 |
1 files changed, 68 insertions, 47 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index b4e35d35..ccf6b2ea 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -143,68 +143,69 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, } func (j *JobsConsumer) Push(job *structs.Job) error { - const op = errors.Op("ephemeral_push") - // lock needed here to re-create a connections and channels in case of error + const op = errors.Op("rabbitmq_push") + // check if the pipeline registered + if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok { + return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + } + + // lock needed here to protect redial concurrent operation + // we may be in the redial state here j.RLock() defer j.RUnlock() // convert msg := FromJob(job) - // check if the pipeline registered - if _, ok := j.pipelines.Load(job.Options.Pipeline); ok { - // handle timeouts - if job.Options.DelayDuration() > 0 { - // TODO declare separate method for this if condition - - delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) - tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) - - _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ - dlx: j.exchangeName, - dlxRoutingKey: j.routingKey, - dlxTTL: delayMs, - dlxExpires: delayMs * 2, - }) - - if err != nil { - panic(err) - } - - err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) - if err != nil { - panic(err) - } - - // insert to the local, limited pipeline - err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ - Headers: pack(job.Ident, 0, msg), - ContentType: contentType, - Timestamp: time.Now(), - Body: nil, - }) - if err != nil { - panic(err) - } - - return nil + // handle timeouts + if job.Options.DelayDuration() > 0 { + // TODO declare separate method for this if condition + delayMs := int64(job.Options.DelayDuration().Seconds() * 1000) + tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue) + + _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{ + dlx: j.exchangeName, + dlxRoutingKey: j.routingKey, + dlxTTL: delayMs, + dlxExpires: delayMs * 2, + }) + + if err != nil { + return errors.E(op, err) + } + + err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil) + if err != nil { + return errors.E(op, err) } // insert to the local, limited pipeline - err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ - Headers: pack(job.Ident, 0, msg), + err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{ + Headers: pack(job.Ident, msg), ContentType: contentType, Timestamp: time.Now(), - Body: nil, + Body: msg.Body(), }) + if err != nil { - panic(err) + return errors.E(op, err) } return nil } - return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline)) + // insert to the local, limited pipeline + err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{ + Headers: pack(job.Ident, msg), + ContentType: contentType, + Timestamp: time.Now(), + Body: msg.Body(), + }) + if err != nil { + return errors.E(op, err) + } + + return nil } func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { @@ -220,11 +221,14 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error { func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { const op = errors.Op("rabbit_consume") - if _, ok := j.pipelines.Load(pipeline.Name()); !ok { return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name())) } + // protect connection (redial) + j.Lock() + defer j.Unlock() + var err error j.consumeChan, err = j.conn.Channel() if err != nil { @@ -256,8 +260,16 @@ func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error { return nil } -func (j *JobsConsumer) List() []*pipeline.Pipeline { - panic("implement me") +func (j *JobsConsumer) List() []string { + out := make([]string, 0, 2) + + j.pipelines.Range(func(key, value interface{}) bool { + pipe := key.(string) + out = append(out, pipe) + return true + }) + + return out } func (j *JobsConsumer) Pause(pipeline string) { @@ -268,6 +280,10 @@ func (j *JobsConsumer) Pause(pipeline string) { } } + // protect connection (redial) + j.Lock() + defer j.Unlock() + err := j.publishChan.Cancel(j.consumeID, true) if err != nil { j.logger.Error("cancel publish channel, forcing close", "error", err) @@ -284,6 +300,11 @@ func (j *JobsConsumer) Resume(pipeline string) { // mark pipeline as turned off j.pipelines.Store(pipeline, true) } + + // protect connection (redial) + j.Lock() + defer j.Unlock() + var err error j.consumeChan, err = j.conn.Channel() if err != nil { |