summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers/amqp
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
committerValery Piashchynski <[email protected]>2021-07-22 13:53:19 +0300
commit05660fcd256963eac94ada90f7baa409344f9e73 (patch)
tree72fe19d7c6b05eda1c5e5cc85cb536878bd8aa24 /plugins/jobs/drivers/amqp
parent182199a6449677a620813e3a8157cd0406095435 (diff)
Update consumers, tests stabilization
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers/amqp')
-rw-r--r--plugins/jobs/drivers/amqp/config.go24
-rw-r--r--plugins/jobs/drivers/amqp/consumer.go92
-rw-r--r--plugins/jobs/drivers/amqp/item.go13
3 files changed, 73 insertions, 56 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 7befb3c8..2a1aed20 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -2,13 +2,15 @@ package amqp
// pipeline rabbitmq info
const (
- exchangeKey string = "exchange"
- exchangeType string = "exchange-type"
- queue string = "queue"
- routingKey string = "routing-key"
- prefetch string = "prefetch"
- exclusive string = "exclusive"
- priority string = "priority"
+ exchangeKey string = "exchange"
+ exchangeType string = "exchange-type"
+ queue string = "queue"
+ routingKey string = "routing-key"
+ prefetch string = "prefetch"
+ exclusive string = "exclusive"
+ priority string = "priority"
+ multipleAsk string = "multiple_ask"
+ requeueOnFail string = "requeue_on_fail"
dlx string = "x-dead-letter-exchange"
dlxRoutingKey string = "x-dead-letter-routing-key"
@@ -24,13 +26,15 @@ type GlobalCfg struct {
// Config is used to parse pipeline configuration
type Config struct {
- PrefetchCount int `mapstructure:"pipeline_size"`
+ Prefetch int `mapstructure:"prefetch"`
Queue string `mapstructure:"queue"`
Priority int64 `mapstructure:"priority"`
Exchange string `mapstructure:"exchange"`
ExchangeType string `mapstructure:"exchange_type"`
RoutingKey string `mapstructure:"routing_key"`
Exclusive bool `mapstructure:"exclusive"`
+ MultipleAck bool `mapstructure:"multiple_ask"`
+ RequeueOnFail bool `mapstructure:"requeue_on_fail"`
}
func (c *Config) InitDefault() {
@@ -42,8 +46,8 @@ func (c *Config) InitDefault() {
c.Exchange = "default"
}
- if c.PrefetchCount == 0 {
- c.PrefetchCount = 100
+ if c.Prefetch == 0 {
+ c.Prefetch = 100
}
if c.Priority == 0 {
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go
index 6def138e..d592a17a 100644
--- a/plugins/jobs/drivers/amqp/consumer.go
+++ b/plugins/jobs/drivers/amqp/consumer.go
@@ -29,17 +29,19 @@ type JobsConsumer struct {
conn *amqp.Connection
consumeChan *amqp.Channel
publishChan *amqp.Channel
+ consumeID string
+ connStr string
retryTimeout time.Duration
- prefetchCount int
+ prefetch int
priority int64
exchangeName string
queue string
exclusive bool
- consumeID string
- connStr string
exchangeType string
routingKey string
+ multipleAck bool
+ requeueOnFail bool
delayCache map[string]struct{}
@@ -53,17 +55,6 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- pq: pq,
- eh: e,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- // TODO to config
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
-
// if no such key - error
if !cfg.Has(configKey) {
return nil, errors.E(op, errors.Errorf("no configuration by provided key: %s", configKey))
@@ -74,7 +65,7 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
return nil, errors.E(op, errors.Str("no global amqp configuration, global configuration should contain amqp addrs"))
}
- // PARSE CONFIGURATION -------
+ // PARSE CONFIGURATION START -------
var pipeCfg Config
var globalCfg GlobalCfg
@@ -91,16 +82,28 @@ func NewAMQPConsumer(configKey string, log logger.Logger, cfg config.Configurer,
}
globalCfg.InitDefault()
+ // PARSE CONFIGURATION END -------
- jb.routingKey = pipeCfg.RoutingKey
- jb.queue = pipeCfg.Queue
- jb.exchangeType = pipeCfg.ExchangeType
- jb.exchangeName = pipeCfg.Exchange
- jb.prefetchCount = pipeCfg.PrefetchCount
- jb.exclusive = pipeCfg.Exclusive
- jb.priority = pipeCfg.Priority
-
- // PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ pq: pq,
+ eh: e,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ // TODO to config
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+ priority: pipeCfg.Priority,
+
+ routingKey: pipeCfg.RoutingKey,
+ queue: pipeCfg.Queue,
+ exchangeType: pipeCfg.ExchangeType,
+ exchangeName: pipeCfg.Exchange,
+ prefetch: pipeCfg.Prefetch,
+ exclusive: pipeCfg.Exclusive,
+ multipleAck: pipeCfg.MultipleAck,
+ requeueOnFail: pipeCfg.RequeueOnFail,
+ }
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
@@ -131,15 +134,6 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
// we need to obtain two parts of the amqp information here.
// firs part - address to connect, it is located in the global section under the amqp pluginName
// second part - queues and other pipeline information
- jb := &JobsConsumer{
- log: log,
- eh: e,
- pq: pq,
- consumeID: uuid.NewString(),
- stopCh: make(chan struct{}),
- retryTimeout: time.Minute * 5,
- delayCache: make(map[string]struct{}, 100),
- }
// only global section
if !cfg.Has(pluginName) {
@@ -156,16 +150,28 @@ func FromPipeline(pipeline *pipeline.Pipeline, log logger.Logger, cfg config.Con
globalCfg.InitDefault()
- jb.routingKey = pipeline.String(routingKey, "")
- jb.queue = pipeline.String(queue, "default")
- jb.exchangeType = pipeline.String(exchangeType, "direct")
- jb.exchangeName = pipeline.String(exchangeKey, "amqp.default")
- jb.prefetchCount = pipeline.Int(prefetch, 10)
- jb.priority = int64(pipeline.Int(priority, 10))
- jb.exclusive = pipeline.Bool(exclusive, true)
-
// PARSE CONFIGURATION -------
+ jb := &JobsConsumer{
+ log: log,
+ eh: e,
+ pq: pq,
+ consumeID: uuid.NewString(),
+ stopCh: make(chan struct{}),
+ retryTimeout: time.Minute * 5,
+ delayCache: make(map[string]struct{}, 100),
+
+ routingKey: pipeline.String(routingKey, ""),
+ queue: pipeline.String(queue, "default"),
+ exchangeType: pipeline.String(exchangeType, "direct"),
+ exchangeName: pipeline.String(exchangeKey, "amqp.default"),
+ prefetch: pipeline.Int(prefetch, 10),
+ priority: int64(pipeline.Int(priority, 10)),
+ exclusive: pipeline.Bool(exclusive, true),
+ multipleAck: pipeline.Bool(multipleAsk, false),
+ requeueOnFail: pipeline.Bool(requeueOnFail, false),
+ }
+
jb.conn, err = amqp.Dial(globalCfg.Addr)
if err != nil {
return nil, errors.E(op, err)
@@ -315,7 +321,7 @@ func (j *JobsConsumer) Run(p *pipeline.Pipeline) error {
return errors.E(op, err)
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
return errors.E(op, err)
}
@@ -409,7 +415,7 @@ func (j *JobsConsumer) Resume(p string) {
return
}
- err = j.consumeChan.Qos(j.prefetchCount, 0, false)
+ err = j.consumeChan.Qos(j.prefetch, 0, false)
if err != nil {
j.log.Error("qos set failed", "error", err)
return
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index 6b912dde..bc679037 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -50,6 +50,10 @@ type Options struct {
// Reserve defines for how broker should wait until treating job are failed. Defaults to 30 min.
Timeout int64 `json:"timeout,omitempty"`
+
+ // private
+ multipleAsk bool
+ requeue bool
}
// DelayDuration returns delay duration in a form of time.Duration.
@@ -100,11 +104,11 @@ func (j *Item) Context() ([]byte, error) {
}
func (j *Item) Ack() error {
- return j.AckFunc(false)
+ return j.AckFunc(j.Options.multipleAsk)
}
func (j *Item) Nack() error {
- return j.NackFunc(false, false)
+ return j.NackFunc(false, j.Options.requeue)
}
func (j *JobsConsumer) fromDelivery(d amqp.Delivery) (*Item, error) {
@@ -157,7 +161,10 @@ func pack(id string, j *Item) (amqp.Table, error) {
// unpack restores jobs.Options
func (j *JobsConsumer) unpack(d amqp.Delivery) (*Item, error) {
- item := &Item{Payload: utils.AsString(d.Body), Options: &Options{}}
+ item := &Item{Payload: utils.AsString(d.Body), Options: &Options{
+ multipleAsk: j.multipleAck,
+ requeue: j.requeueOnFail,
+ }}
if _, ok := d.Headers[job.RRID].(string); !ok {
return nil, errors.E(errors.Errorf("missing header `%s`", job.RRID))