diff options
author | Valery Piashchynski <[email protected]> | 2021-08-12 11:28:45 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-12 11:28:45 +0300 |
commit | 4169e8374f581ba2213f8cd1833cc6b9b84438e8 (patch) | |
tree | b1d911fbd0ef5960c0513553d8be94809db8b14b /plugins/jobs/plugin.go | |
parent | bf2f7167ae49ecac981c7c18a9b9b496fd0a514c (diff) |
Fix various bugs in the SQS. Implement SQS tests for the
jobs_ok.php/jobs_err.php workers.
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 87559034..e2fffda7 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -30,17 +30,18 @@ const ( ) type Plugin struct { - cfg *Config `structure:"jobs"` - log logger.Logger - sync.RWMutex + // Jobs plugin configuration + cfg *Config `structure:"jobs"` + log logger.Logger workersPool pool.Pool server server.Server jobConstructors map[string]jobs.Constructor consumers map[string]jobs.Consumer + // events handler events events.Handler // priority queue implementation @@ -55,6 +56,7 @@ type Plugin struct { // signal channel to stop the pollers stopCh chan struct{} + // internal payloads pool pldPool sync.Pool } @@ -189,7 +191,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit jb := p.queue.ExtractMin() // parse the context - // for the each job, context contains: + // for each job, context contains: /* 1. Job class 2. Job ID provided from the outside @@ -216,12 +218,13 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) } - p.log.Error("job execute", "error", err) + p.log.Error("job execute failed", "error", err) p.putPayload(exec) continue @@ -310,7 +313,7 @@ func (p *Plugin) Reset() error { defer p.Unlock() const op = errors.Op("jobs_plugin_reset") - p.log.Info("JOBS plugin got restart request. Restarting...") + p.log.Info("JOBS plugin received restart request. Restarting...") p.workersPool.Destroy(context.Background()) p.workersPool = nil |