summaryrefslogtreecommitdiff
path: root/plugins/jobs/brokers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-12 08:47:33 +0300
committerValery Piashchynski <[email protected]>2021-07-12 08:47:33 +0300
commite82e9248bb1afd5e571f465ac79ac7f5f79b81f1 (patch)
tree098a9827f51255916f99160b02098153f8d0238e /plugins/jobs/brokers/amqp/consumer.go
parent0f70f1e2311640236d74a0a237536779d8d44223 (diff)
Finish dynamic declaration of the pipelines. Fix issue with
configuration parsing in the AMQP consumer. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/brokers/amqp/consumer.go')
-rw-r--r--plugins/jobs/brokers/amqp/consumer.go79
1 files changed, 67 insertions, 12 deletions
diff --git a/plugins/jobs/brokers/amqp/consumer.go b/plugins/jobs/brokers/amqp/consumer.go
index 22eee2dc..a7916f7e 100644
--- a/plugins/jobs/brokers/amqp/consumer.go
+++ b/plugins/jobs/brokers/amqp/consumer.go
@@ -21,6 +21,7 @@ const (
exchangeType string = "exchange-type"
queue string = "queue"
routingKey string = "routing-key"
+ prefetch string = "prefetch"
dlx string = "x-dead-letter-exchange"
dlxRoutingKey string = "x-dead-letter-routing-key"
@@ -76,10 +77,11 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
jb := &JobsConsumer{
- log: log,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
+ log: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
retryTimeout: time.Minute * 5,
delayCache: make(map[string]struct{}, 100),
}
@@ -105,7 +107,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
pipeCfg.InitDefault()
- err = cfg.UnmarshalKey(configKey, &globalCfg)
+ err = cfg.UnmarshalKey(pluginName, &globalCfg)
if err != nil {
return nil, errors.E(op, err)
}
@@ -125,7 +127,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, err)
}
- // assign address
+ // save address
jb.connStr = globalCfg.Addr
err = jb.initRabbitMQ()
@@ -144,12 +146,65 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return jb, nil
}
-func FromPipeline(_ *pipeline.Pipeline, _ priorityqueue.Queue) (*JobsConsumer, error) {
- _ = exchangeType
- _ = exchangeKey
- _ = queue
- _ = routingKey
- panic("not implemented")
+func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Configurer, pq priorityqueue.Queue) (*JobsConsumer, error) {
+ const op = errors.Op("new_amqp_consumer_from_pipeline")
+ // we need to obtain two parts of the amqp information here.
+ // firs part - address to connect, it is located in the global section under the amqp pluginName
+ // second part - queues and other pipeline information
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ }
+
+ // if no global section
+ if !cfg.Has(pluginName) {
+ return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
+ }
+
+ // PARSE CONFIGURATION -------
+ var globalCfg GlobalCfg
+
+ err := cfg.UnmarshalKey(pluginName, &globalCfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ globalCfg.InitDefault()
+
+ jb.routingKey = pipeline.String(routingKey, "")
+ jb.queue = pipeline.String(queue, "default")
+ jb.exchangeType = pipeline.String(exchangeType, "direct")
+ jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
+ jb.prefetchCount = pipeline.Int(prefetch, 10)
+
+ // PARSE CONFIGURATION -------
+
+ jb.conn, err = amqp.Dial(globalCfg.Addr)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save address
+ jb.connStr = globalCfg.Addr
+
+ err = jb.initRabbitMQ()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ jb.publishChan, err = jb.conn.Channel()
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // run redialer for the connection
+ jb.redialer()
+
+ return jb, nil
}
func (j *JobsConsumer) Push(job *structs.Job) error {