summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-08-12 11:28:45 +0300
committerValery Piashchynski <[email protected]>2021-08-12 11:28:45 +0300
commit4169e8374f581ba2213f8cd1833cc6b9b84438e8 (patch)
treeb1d911fbd0ef5960c0513553d8be94809db8b14b /plugins/jobs/plugin.go
parentbf2f7167ae49ecac981c7c18a9b9b496fd0a514c (diff)
Fix various bugs in the SQS. Implement SQS tests for the
jobs_ok.php/jobs_err.php workers. Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go15
1 files changed, 9 insertions, 6 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 87559034..e2fffda7 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -30,17 +30,18 @@ const (
)
type Plugin struct {
- cfg *Config `structure:"jobs"`
- log logger.Logger
-
sync.RWMutex
+ // Jobs plugin configuration
+ cfg *Config `structure:"jobs"`
+ log logger.Logger
workersPool pool.Pool
server server.Server
jobConstructors map[string]jobs.Constructor
consumers map[string]jobs.Consumer
+ // events handler
events events.Handler
// priority queue implementation
@@ -55,6 +56,7 @@ type Plugin struct {
// signal channel to stop the pollers
stopCh chan struct{}
+ // internal payloads pool
pldPool sync.Pool
}
@@ -189,7 +191,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
jb := p.queue.ExtractMin()
// parse the context
- // for the each job, context contains:
+ // for each job, context contains:
/*
1. Job class
2. Job ID provided from the outside
@@ -216,12 +218,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
resp, err := p.workersPool.Exec(exec)
p.RUnlock()
if err != nil {
+ // RR protocol level error, Nack the job
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
}
- p.log.Error("job execute", "error", err)
+ p.log.Error("job execute failed", "error", err)
p.putPayload(exec)
continue
@@ -310,7 +313,7 @@ func (p *Plugin) Reset() error {
defer p.Unlock()
const op = errors.Op("jobs_plugin_reset")
- p.log.Info("JOBS plugin got restart request. Restarting...")
+ p.log.Info("JOBS plugin received restart request. Restarting...")
p.workersPool.Destroy(context.Background())
p.workersPool = nil