summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go115
1 files changed, 68 insertions, 47 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index b4e35d35..ccf6b2ea 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -143,68 +143,69 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
func (j *JobsConsumer) Push(job *structs.Job) error {
- const op = errors.Op("ephemeral_push")
- // lock needed here to re-create a connections and channels in case of error
+ const op = errors.Op("rabbitmq_push")
+ // check if the pipeline registered
+ if _, ok := j.pipelines.Load(job.Options.Pipeline); !ok {
+ return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ }
+
+ // lock needed here to protect redial concurrent operation
+ // we may be in the redial state here
j.RLock()
defer j.RUnlock()
// convert
msg := FromJob(job)
- // check if the pipeline registered
- if _, ok := j.pipelines.Load(job.Options.Pipeline); ok {
- // handle timeouts
- if job.Options.DelayDuration() > 0 {
- // TODO declare separate method for this if condition
-
- delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
- tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
-
- _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
- dlx: j.exchangeName,
- dlxRoutingKey: j.routingKey,
- dlxTTL: delayMs,
- dlxExpires: delayMs * 2,
- })
-
- if err != nil {
- panic(err)
- }
-
- err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
- if err != nil {
- panic(err)
- }
-
- // insert to the local, limited pipeline
- err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, msg),
- ContentType: contentType,
- Timestamp: time.Now(),
- Body: nil,
- })
- if err != nil {
- panic(err)
- }
-
- return nil
+ // handle timeouts
+ if job.Options.DelayDuration() > 0 {
+ // TODO declare separate method for this if condition
+ delayMs := int64(job.Options.DelayDuration().Seconds() * 1000)
+ tmpQ := fmt.Sprintf("delayed-%d.%s.%s", delayMs, j.exchangeName, j.queue)
+
+ _, err := j.publishChan.QueueDeclare(tmpQ, true, false, false, false, amqp.Table{
+ dlx: j.exchangeName,
+ dlxRoutingKey: j.routingKey,
+ dlxTTL: delayMs,
+ dlxExpires: delayMs * 2,
+ })
+
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = j.publishChan.QueueBind(tmpQ, tmpQ, j.exchangeName, false, nil)
+ if err != nil {
+ return errors.E(op, err)
}
// insert to the local, limited pipeline
- err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
- Headers: pack(job.Ident, 0, msg),
+ err = j.publishChan.Publish(j.exchangeName, tmpQ, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, msg),
ContentType: contentType,
Timestamp: time.Now(),
- Body: nil,
+ Body: msg.Body(),
})
+
if err != nil {
- panic(err)
+ return errors.E(op, err)
}
return nil
}
- return errors.E(op, errors.Errorf("no such pipeline: %s", job.Options.Pipeline))
+ // insert to the local, limited pipeline
+ err := j.publishChan.Publish(j.exchangeName, j.routingKey, false, false, amqp.Publishing{
+ Headers: pack(job.Ident, msg),
+ ContentType: contentType,
+ Timestamp: time.Now(),
+ Body: msg.Body(),
+ })
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
}
func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
@@ -220,11 +221,14 @@ func (j *JobsConsumer) Register(pipeline *pipeline.Pipeline) error {
func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
const op = errors.Op("rabbit_consume")
-
if _, ok := j.pipelines.Load(pipeline.Name()); !ok {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipeline.Name()))
}
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {
@@ -256,8 +260,16 @@ func (j *JobsConsumer) Consume(pipeline *pipeline.Pipeline) error {
return nil
}
-func (j *JobsConsumer) List() []*pipeline.Pipeline {
- panic("implement me")
+func (j *JobsConsumer) List() []string {
+ out := make([]string, 0, 2)
+
+ j.pipelines.Range(func(key, value interface{}) bool {
+ pipe := key.(string)
+ out = append(out, pipe)
+ return true
+ })
+
+ return out
}
func (j *JobsConsumer) Pause(pipeline string) {
@@ -268,6 +280,10 @@ func (j *JobsConsumer) Pause(pipeline string) {
}
}
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
err := j.publishChan.Cancel(j.consumeID, true)
if err != nil {
j.logger.Error("cancel publish channel, forcing close", "error", err)
@@ -284,6 +300,11 @@ func (j *JobsConsumer) Resume(pipeline string) {
// mark pipeline as turned off
j.pipelines.Store(pipeline, true)
}
+
+ // protect connection (redial)
+ j.Lock()
+ defer j.Unlock()
+
var err error
j.consumeChan, err = j.conn.Channel()
if err != nil {