diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
commit | c35fbff05205330ab8e49f6008fdbd59128cee14 (patch) | |
tree | d3eb03e8db7231d97ae4ff1d60a0c5a50db8a6fb /plugins/jobs/plugin.go | |
parent | 1d092e57afb55a01283b41942ca3ef15a7e4bdef (diff) |
Add prometheus metrics for the jobs, update tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r-- | plugins/jobs/plugin.go | 94 |
1 files changed, 86 insertions, 8 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5f6c8b94..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -59,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -105,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -202,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -220,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -237,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -256,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -359,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -372,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -397,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -410,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -432,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -576,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: |