summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
committerValery Piashchynski <[email protected]>2021-08-11 22:03:34 +0300
commit2d460062c97f9ad1e793831c54ae4d177dea83e8 (patch)
treed796a11941fab4be668843a3fcbd83ea0859db39 /plugins/jobs/plugin.go
parente855ae9fe5673bd95f45f9a265259cb5ecdc9f81 (diff)
Durable requeue algo. Update AMQP and Beanstalk tests to use mock
logger. Fix bugs discovered during testing. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go28
1 files changed, 15 insertions, 13 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 8ea18cfd..87559034 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -104,19 +104,6 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return nil
}
-func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
- pld := p.pldPool.Get().(*payload.Payload)
- pld.Body = body
- pld.Context = context
- return pld
-}
-
-func (p *Plugin) putPayload(pld *payload.Payload) {
- pld.Body = nil
- pld.Context = nil
- p.pldPool.Put(pld)
-}
-
func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("jobs_plugin_serve")
@@ -261,6 +248,8 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ // free the resources
+ jb.Recycle()
// return payload
p.putPayload(exec)
}
@@ -565,3 +554,16 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
}
}
}
+
+func (p *Plugin) getPayload(body, context []byte) *payload.Payload {
+ pld := p.pldPool.Get().(*payload.Payload)
+ pld.Body = body
+ pld.Context = context
+ return pld
+}
+
+func (p *Plugin) putPayload(pld *payload.Payload) {
+ pld.Body = nil
+ pld.Context = nil
+ p.pldPool.Put(pld)
+}