diff options
author | Valery Piashchynski <[email protected]> | 2021-07-12 08:47:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-12 08:47:33 +0300 |
commit | e82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (patch) | |
tree | 098a9827f51255916f99160b02098153f8d0238e /plugins/jobs/brokers/amqp/consumer.go | |
parent | 0f70f1e2311640236d74a0a237536779d8d44223 (diff) |
Finish dynamic declaration of the pipelines. Fix issue with
configuration parsing in the AMQP consumer.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r-- | plugins/jobs/brokers/amqp/consumer.go | 79 |
1 files changed, 67 insertions, 12 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go index 22eee2dc..a7916f7e 100644 --- a/plugins/jobs/brokers/amqp/consumer.go +++ b/plugins/jobs/brokers/amqp/consumer.go @@ -21,6 +21,7 @@ const ( exchangeType string = "exchange-type" queue string = "queue" routingKey string = "routing-key" + prefetch string = "prefetch" dlx string = "x-dead-letter-exchange" dlxRoutingKey string = "x-dead-letter-routing-key" @@ -76,10 +77,11 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, // firs part - address to connect, it is located in the global section under the amqp pluginName // second part - queues and other pipeline information jb := &JobsConsumer{ - log: log, - pq: pq, - consumeID: uuid.NewString(), - stopCh: make(chan struct{}), + log: log, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + // TODO to config retryTimeout: time.Minute * 5, delayCache: make(map[string]struct{}, 100), } @@ -105,7 +107,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, pipeCfg.InitDefault() - err = cfg.UnmarshalKey(configKey, &globalCfg) + err = cfg.UnmarshalKey(pluginName, &globalCfg) if err != nil { return nil, errors.E(op, err) } @@ -125,7 +127,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return nil, errors.E(op, err) } - // assign address + // save address jb.connStr = globalCfg.Addr err = jb.initRabbitMQ() @@ -144,12 +146,65 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer, return jb, nil } -func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) { - _ = exchangeType - _ = exchangeKey - _ = queue - _ = routingKey - panic("not implemented") +func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) { + const op = errors.Op("new_amqp_consumer_from_pipeline") + // we need to obtain two parts of the amqp information here. + // firs part - address to connect, it is located in the global section under the amqp pluginName + // second part - queues and other pipeline information + jb := &JobsConsumer{ + log: log, + pq: pq, + consumeID: uuid.NewString(), + stopCh: make(chan struct{}), + retryTimeout: time.Minute * 5, + delayCache: make(map[string]struct{}, 100), + } + + // if no global section + if !cfg.Has(pluginName) { + return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs")) + } + + // PARSE CONFIGURATION ------- + var globalCfg GlobalCfg + + err := cfg.UnmarshalKey(pluginName, &globalCfg) + if err != nil { + return nil, errors.E(op, err) + } + + globalCfg.InitDefault() + + jb.routingKey = pipeline.String(routingKey, "") + jb.queue = pipeline.String(queue, "default") + jb.exchangeType = pipeline.String(exchangeType, "direct") + jb.exchangeName = pipeline.String(exchangeKey, "amqp.default") + jb.prefetchCount = pipeline.Int(prefetch, 10) + + // PARSE CONFIGURATION ------- + + jb.conn, err = amqp.Dial(globalCfg.Addr) + if err != nil { + return nil, errors.E(op, err) + } + + // save address + jb.connStr = globalCfg.Addr + + err = jb.initRabbitMQ() + if err != nil { + return nil, errors.E(op, err) + } + + jb.publishChan, err = jb.conn.Channel() + if err != nil { + return nil, errors.E(op, err) + } + + // run redialer for the connection + jb.redialer() + + return jb, nil } func (j *JobsConsumer) Push(job *structs.Job) error { |