summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
committerValery Piashchynski <[email protected]>2021-07-21 16:52:41 +0300
commitb2da831f47284974551710d2767a7bdde0efa51d (patch)
tree7d8fee59cdb307110d2fcd872635437e0203321b /plugins/jobs/plugin.go
parent50cf036c81668508c8f2e9130bc5a2019cddf1b9 (diff)
Fix AMQP context, add ID, job, other fields.
Fix sqs queue re-creation. Complete redia for the beanstalk. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go23
1 files changed, 16 insertions, 7 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 98e7ebf8..47d31d99 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -20,6 +20,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -194,11 +195,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
return
default:
// get data JOB from the queue
- job := p.queue.ExtractMin()
+ jb := p.queue.ExtractMin()
- ctx, err := job.Context()
+ ctx, err := jb.Context()
if err != nil {
- errNack := job.Nack()
+ errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
@@ -208,14 +209,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
exec := payload.Payload{
Context: ctx,
- Body: job.Body(),
+ Body: jb.Body(),
}
// protect from the pool reset
p.RLock()
- _, err = p.workersPool.Exec(exec)
+
+ // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+
+ resp, err := p.workersPool.Exec(exec)
if err != nil {
- errNack := job.Nack()
+ errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
@@ -226,9 +231,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
p.RUnlock()
- errAck := job.Ack()
+ // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
+ p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
+
+ errAck := jb.Ack()
if errAck != nil {
p.log.Error("acknowledge failed", "error", errAck)
+ continue
}
// TEST HELPER, SHOULD BE DELETED IN THE RELEASE <-----------------------------------------------------
atomic.AddUint64(&rate, 1)