diff options
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 31 |
1 files changed, 14 insertions, 17 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 6c848a9d..61936db2 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -18,7 +18,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/jobs/pipeline" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -81,6 +80,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.consumers = make(map[string]jobs.Consumer) p.consume = make(map[string]struct{}) p.stopCh = make(chan struct{}, 1) + p.pldPool = sync.Pool{New: func() interface{} { // with nil fields return &payload.Payload{} @@ -104,8 +104,11 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se return nil } -func (p *Plugin) getPayload() *payload.Payload { - return p.pldPool.Get().(*payload.Payload) +func (p *Plugin) getPayload(body, context []byte) *payload.Payload { + pld := p.pldPool.Get().(*payload.Payload) + pld.Body = body + pld.Context = context + return pld } func (p *Plugin) putPayload(pld *payload.Payload) { @@ -219,13 +222,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit } // get payload from the sync.Pool - exec := p.getPayload() - exec.Body = jb.Body() - exec.Context = ctx - - // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- - // remove in tests - p.log.Debug("request", "body:", utils.AsString(exec.Body), "context:", utils.AsString(exec.Context)) + exec := p.getPayload(jb.Body(), ctx) // protect from the pool reset p.RLock() @@ -243,14 +240,14 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - // TODO REMOVE AFTER TESTS <--------------------------------------------------------------------------- - // remove in tests - p.log.Debug("response", "body:", utils.AsString(resp.Body), "context:", utils.AsString(resp.Context)) - - errAck := jb.Ack() - if errAck != nil { - p.log.Error("acknowledge failed", "error", errAck) + // handle the response protocol + err = handleResponse(resp.Body, jb, p.log) + if err != nil { p.putPayload(exec) + errNack := jb.Nack() + if errNack != nil { + p.log.Error("negatively acknowledge failed, job might be lost", "error", errNack) + } continue } |