summaryrefslogtreecommitdiff
path: root/plugins/jobs/drivers
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-12 09:38:47 +0300
committerValery Piashchynski <[email protected]>2021-08-12 09:38:47 +0300
commitbf2f7167ae49ecac981c7c18a9b9b496fd0a514c (patch)
tree9c5c11ab7984df0775d62553808fc95d395e20d5 /plugins/jobs/drivers
parent2d460062c97f9ad1e793831c54ae4d177dea83e8 (diff)
Made QoS production like
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r--plugins/jobs/drivers/amqp/config.go3
-rw-r--r--plugins/jobs/drivers/amqp/item.go7
2 files changed, 6 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 3fb0d066..73482d4d 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -38,6 +38,7 @@ type Config struct {
}
func (c *Config) InitDefault() {
+ // all options should be in sync with the pipeline defaults in the FromPipeline method
if c.ExchangeType == "" {
c.ExchangeType = "direct"
}
@@ -47,7 +48,7 @@ func (c *Config) InitDefault() {
}
if c.Prefetch == 0 {
- c.Prefetch = 100
+ c.Prefetch = 10
}
if c.Priority == 0 {
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index f252acd8..9b9625b0 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -54,6 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
+ // requeueFn used as a pointer to the push function
requeueFn func(context.Context, *Item) error
multipleAsk bool
@@ -123,9 +124,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
- errAck := i.Options.nack(false, true)
- if errAck != nil {
- return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ errNack := i.Options.nack(false, true)
+ if errNack != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack)
}
return err