summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp/consumer.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
committerValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
commit05660fcd256963eac94ada90f7baa409344f9e73 (patch)
tree72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/drivers/amqp/consumer.go
parent182199a6449677a620813e3a8157cd0406095435 (diff)
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp/consumer.go')
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go92
1 files changed, 49 insertions, 43 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 6def138e..d592a17a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -29,17 +29,19 @@ type JobsConsumer struct {
conn *amqp.Connection
consumeChan *amqp.Channel
publishChan *amqp.Channel
+ consumeID string
+ connStr string
retryTimeout time.Duration
- prefetchCount int
+ prefetch int
priority int64
exchangeName string
queue string
exclusive bool
- consumeID string
- connStr string
exchangeType string
routingKey string
+ multipleAck bool
+ requeueOnFail bool
delayCache map[string]struct{}
@@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
-
// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
@@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- // PARSE CONFIGURATION -------
+ // PARSE CONFIGURATION START -------
var pipeCfg Config
var globalCfg GlobalCfg
@@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
- jb.routingKey = pipeCfg.RoutingKey
- jb.queue = pipeCfg.Queue
- jb.exchangeType = pipeCfg.ExchangeType
- jb.exchangeName = pipeCfg.Exchange
- jb.prefetchCount = pipeCfg.PrefetchCount
- jb.exclusive = pipeCfg.Exclusive
- jb.priority = pipeCfg.Priority
-
- // PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ priority: pipeCfg.Priority,
+
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
@@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
// only global section
if !cfg.Has(pluginName) {
@@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
globalCfg.InitDefault()
- jb.routingKey = pipeline.String(routingKey, "")
- jb.queue = pipeline.String(queue, "default")
- jb.exchangeType = pipeline.String(exchangeType, "direct")
- jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
- jb.prefetchCount = pipeline.Int(prefetch, 10)
- jb.priority = int64(pipeline.Int(priority, 10))
- jb.exclusive = pipeline.Bool(exclusive, true)
-
// PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, true),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
return nil, errors.E(op, err)
@@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
@@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) {
return
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
j.log.Error("qos set failed", "error", err)
return