diff options
author | Valery Piashchynski <[email protected]> | 2021-08-12 09:38:47 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-12 09:38:47 +0300 |
commit | bf2f7167ae49ecac981c7c18a9b9b496fd0a514c (patch) | |
tree | 9c5c11ab7984df0775d62553808fc95d395e20d5 /plugins/jobs/drivers | |
parent | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (diff) |
Made QoS production like
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/drivers')
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 3 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 7 |
2 files changed, 6 insertions, 4 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 3fb0d066..73482d4d 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -38,6 +38,7 @@ type Config struct { } func (c *Config) InitDefault() { + // all options should be in sync with the pipeline defaults in the FromPipeline method if c.ExchangeType == "" { c.ExchangeType = "direct" } @@ -47,7 +48,7 @@ func (c *Config) InitDefault() { } if c.Prefetch == 0 { - c.Prefetch = 100 + c.Prefetch = 10 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index f252acd8..9b9625b0 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -54,6 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error + // requeueFn used as a pointer to the push function requeueFn func(context.Context, *Item) error multipleAsk bool @@ -123,9 +124,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { err := i.Options.requeueFn(context.Background(), i) if err != nil { - errAck := i.Options.nack(false, true) - if errAck != nil { - return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + errNack := i.Options.nack(false, true) + if errNack != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack) } return err |