diff options
author | Valery Piashchynski <[email protected]> | 2021-08-12 09:38:47 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-12 09:38:47 +0300 |
commit | bf2f7167ae49ecac981c7c18a9b9b496fd0a514c (patch) | |
tree | 9c5c11ab7984df0775d62553808fc95d395e20d5 | |
parent | 2d460062c97f9ad1e793831c54ae4d177dea83e8 (diff) |
Made QoS production like
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | plugins/jobs/drivers/amqp/config.go | 3 | ||||
-rw-r--r-- | plugins/jobs/drivers/amqp/item.go | 7 | ||||
-rw-r--r-- | plugins/reload/plugin.go | 2 |
3 files changed, 7 insertions, 5 deletions
diff --git a/plugins/jobs/drivers/amqp/config.go b/plugins/jobs/drivers/amqp/config.go index 3fb0d066..73482d4d 100644 --- a/plugins/jobs/drivers/amqp/config.go +++ b/plugins/jobs/drivers/amqp/config.go @@ -38,6 +38,7 @@ type Config struct { } func (c *Config) InitDefault() { + // all options should be in sync with the pipeline defaults in the FromPipeline method if c.ExchangeType == "" { c.ExchangeType = "direct" } @@ -47,7 +48,7 @@ func (c *Config) InitDefault() { } if c.Prefetch == 0 { - c.Prefetch = 100 + c.Prefetch = 10 } if c.Priority == 0 { diff --git a/plugins/jobs/drivers/amqp/item.go b/plugins/jobs/drivers/amqp/item.go index f252acd8..9b9625b0 100644 --- a/plugins/jobs/drivers/amqp/item.go +++ b/plugins/jobs/drivers/amqp/item.go @@ -54,6 +54,7 @@ type Options struct { // This method must not be used to select or requeue messages the client wishes not to handle, rather it is to inform the server that the client is incapable of handling this message at this time nack func(multiply bool, requeue bool) error + // requeueFn used as a pointer to the push function requeueFn func(context.Context, *Item) error multipleAsk bool @@ -123,9 +124,9 @@ func (i *Item) Requeue(headers map[string][]string, delay int64) error { err := i.Options.requeueFn(context.Background(), i) if err != nil { - errAck := i.Options.nack(false, true) - if errAck != nil { - return fmt.Errorf("requeue error: %v\nack error: %v", err, errAck) + errNack := i.Options.nack(false, true) + if errNack != nil { + return fmt.Errorf("requeue error: %v\nack error: %v", err, errNack) } return err diff --git a/plugins/reload/plugin.go b/plugins/reload/plugin.go index 7e6bdfec..d3271d6c 100644 --- a/plugins/reload/plugin.go +++ b/plugins/reload/plugin.go @@ -117,7 +117,7 @@ func (s *Plugin) Serve() chan error { timer.Stop() // replace previous value in map by more recent without adding new one updated[cfg.service] = cfg.serviceConfig - // if we getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) + // if we are getting a lot of events, we shouldn't restart particular service on each of it (user doing batch move or very fast typing) // instead, we are resetting the timer and wait for s.cfg.Interval time // If there is no more events, we restart service only once timer.Reset(s.cfg.Interval) |