diff options
author | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-08 17:54:29 +0300 |
commit | 4566f88004e81d3229222d82614c15346ac2e47d (patch) | |
tree | 05dc6ffeea8d00cb63cc6a51c17ae2afda8aaa5a /plugins/jobs/plugin.go | |
parent | 5f84c5d5709cff5984a5859651a0bbb1c55fcb0f (diff) |
AMQP update...
Add redialer, consumer, rabbit queues initializer.
Update configuration (.rr.yaml).
Add ack/nack for the jobs main plugin with error handling.
Add Qos, queues bining and declaration.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 16 |
1 files changed, 13 insertions, 3 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 51da9421..9d68a95a 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -171,7 +171,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit ctx, err := job.Context() if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } p.log.Error("job marshal context", "error", err) } @@ -182,7 +185,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit _, err = p.workersPool.Exec(exec) if err != nil { - job.Nack() + errNack := job.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed", "error", errNack) + } + p.log.Error("job execute", "error", err) continue } @@ -190,7 +197,10 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) - job.Ack() + errAck := job.Ack() + if errAck != nil { + p.log.Error("acknowledge failed", "error", errAck) + } } }() } |