diff options
author | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-07-21 16:52:41 +0300 |
commit | b2da831f47284974551710d2767a7bdde0efa51d (patch) | |
tree | 7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/plugin.go | |
parent | 50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff) |
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation.
Complete redia for the beanstalk.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 98e7ebf8..47d31d99 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -20,6 +20,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -194,11 +195,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit return default: // get data JOB from the queue - job := p.queue.ExtractMin() + jb := p.queue.ExtractMin() - ctx, err := job.Context() + ctx, err := jb.Context() if err != nil { - errNack := job.Nack() + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } @@ -208,14 +209,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit exec := payload.Payload{ Context: ctx, - Body: job.Body(), + Body: jb.Body(), } // protect from the pool reset p.RLock() - _, err = p.workersPool.Exec(exec) + + // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + + resp, err := p.workersPool.Exec(exec) if err != nil { - errNack := job.Nack() + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } @@ -226,9 +231,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } p.RUnlock() - errAck := job.Ack() + // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- + p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) + + errAck := jb.Ack() if errAck != nil { p.log.Error("acknowledge failed", "error", errAck) + continue } // TEST HELPER, SHOULD BE DELETED IN THE RELEASE <----------------------------------------------------- atomic.AddUint64(&rate, 1) |