summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go31
1 files changed, 14 insertions, 17 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 6c848a9d..61936db2 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -18,7 +18,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/jobs/pipeline"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -81,6 +80,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.consumers = make(map[string]jobs.Consumer)
p.consume = make(map[string]struct{})
p.stopCh = make(chan struct{}, 1)
+
p.pldPool = sync.Pool{New: func() interface{} {
// with nil fields
return &payload.Payload{}
@@ -104,8 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload() *payload.Payload {
- return p.pldPool.Get().(*payload.Payload)
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
}
func (p *Plugin) putPayload(pld *payload.Payload) {
@@ -219,13 +222,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
}
// get payload from the sync.Pool
- exec := p.getPayload()
- exec.Body = jb.Body()
- exec.Context = ctx
-
- // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
- // remove in tests
- p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context))
+ exec := p.getPayload(jb.Body(), ctx)
// protect from the pool reset
p.RLock()
@@ -243,14 +240,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
- // TODO REMOVE AFTER TESTS <---------------------------------------------------------------------------
- // remove in tests
- p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context))
-
- errAck := jb.Ack()
- if errAck != nil {
- p.log.Error("acknowledge failed", "error", errAck)
+ // handle the response protocol
+ err = handleResponse(resp.Body, jb, p.log)
+ if err != nil {
p.putPayload(exec)
+ errNack := jb.Nack()
+ if errNack != nil {
+ p.log.Error("negatively acknowledge failed, job might be lost", "error", errNack)
+ }
continue
}