summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-12 09:38:47 +0300
committerValery Piashchynski <[email protected]>2021-08-12 09:38:47 +0300
commitbf2f7167ae49ecac981c7c18a9b9b496fd0a514c (patch)
tree9c5c11ab7984df0775d62553808fc95d395e20d5
parent2d460062c97f9ad1e793831c54ae4d177dea83e8 (diff)
Made QoS production like
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--plugins/jobs/drivers/amqp/config.go3
-rw-r--r--plugins/jobs/drivers/amqp/item.go7
-rw-r--r--plugins/reload/plugin.go2
3 files changed, 7 insertions, 5 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go
index 3fb0d066..73482d4d 100644
--- a/plugins/jobs/drivers/amqp/config.go
+++ b/plugins/jobs/drivers/amqp/config.go
@@ -38,6 +38,7 @@ type Config struct {
}
func (c *Config) InitDefault() {
+ // all options should be in sync with the pipeline defaults in the FromPipeline method
if c.ExchangeType == "" {
c.ExchangeType = "direct"
}
@@ -47,7 +48,7 @@ func (c *Config) InitDefault() {
}
if c.Prefetch == 0 {
- c.Prefetch = 100
+ c.Prefetch = 10
}
if c.Priority == 0 {
diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go
index f252acd8..9b9625b0 100644
--- a/plugins/jobs/drivers/amqp/item.go
+++ b/plugins/jobs/drivers/amqp/item.go
@@ -54,6 +54,7 @@ type Options struct {
// This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time
nack func(multiply bool, requeue bool) error
+ // requeueFn used as a pointer to the push function
requeueFn func(context.Context, *Item) error
multipleAsk bool
@@ -123,9 +124,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error {
err := i.Options.requeueFn(context.Background(), i)
if err != nil {
- errAck := i.Options.nack(false, true)
- if errAck != nil {
- return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck)
+ errNack := i.Options.nack(false, true)
+ if errNack != nil {
+ return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack)
}
return err
diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go
index 7e6bdfec..d3271d6c 100644
--- a/plugins/reload/plugin.go
+++ b/plugins/reload/plugin.go
@@ -117,7 +117,7 @@ func (s *Plugin) Serve() chan error {
timer.Stop()
// replace previous value in map by more recent without adding new one
updated[cfg.service] = cfg.serviceConfig
- // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing)
+ // if we are getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing)
// instead, we are resetting the timer and wait for s.cfg.Interval time
// If there is no more events, we restart service only once
timer.Reset(s.cfg.Interval)