summaryrefslogtreecommitdiff
path: root/plugins/jobs/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/jobs/plugin.go')
-rw-r--r--plugins/jobs/plugin.go94
1 files changed, 86 insertions, 8 deletions
diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go
index 5f6c8b94..5e62c5c5 100644
--- a/plugins/jobs/plugin.go
+++ b/plugins/jobs/plugin.go
@@ -59,7 +59,8 @@ type Plugin struct {
stopCh chan struct{}
// internal payloads pool
- pldPool sync.Pool
+ pldPool sync.Pool
+ statsExporter *statsExporter
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
@@ -105,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize)
p.log = log
+ // metrics
+ p.statsExporter = newStatsExporter(p)
+ p.events.AddListener(p.statsExporter.metricsCallback)
+
return nil
}
@@ -202,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
5. Pipeline name
*/
+ start := time.Now()
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobStart,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: 0,
+ })
+
ctx, err := jb.Context()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
+
errNack := jb.Nack()
if errNack != nil {
p.log.Error("negatively acknowledge failed", "error", errNack)
@@ -220,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
resp, err := p.workersPool.Exec(exec)
p.RUnlock()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// RR protocol level error, Nack the job
errNack := jb.Nack()
if errNack != nil {
@@ -237,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.putPayload(exec)
err = jb.Ack()
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.log.Error("acknowledge error, job might be missed", "error", err)
continue
}
+
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
}
// handle the response protocol
err = handleResponse(resp.Body, jb, p.log)
if err != nil {
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobError,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
p.putPayload(exec)
errNack := jb.Nack()
if errNack != nil {
@@ -256,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
+ p.events.Push(events.JobEvent{
+ Event: events.EventJobOK,
+ ID: jb.ID(),
+ Start: start,
+ Elapsed: time.Since(start),
+ })
// return payload
p.putPayload(exec)
}
@@ -359,7 +410,7 @@ func (p *Plugin) Reset() error {
p.workersPool = nil
var err error
- p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents)
+ p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback)
if err != nil {
return errors.E(op, err)
}
@@ -372,6 +423,7 @@ func (p *Plugin) Reset() error {
func (p *Plugin) Push(j *job.Job) error {
const op = errors.Op("jobs_plugin_push")
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j.Options.Pipeline)
if !ok {
@@ -397,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error {
err := d.Push(ctx, j)
if err != nil {
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
- cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushOK,
+ ID: j.Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return nil
}
@@ -410,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
const op = errors.Op("jobs_plugin_push")
for i := 0; i < len(j); i++ {
+ start := time.Now()
// get the pipeline for the job
pipe, ok := p.pipelines.Load(j[i].Options.Pipeline)
if !ok {
@@ -432,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error {
err := d.Push(ctx, j[i])
if err != nil {
cancel()
+ p.events.Push(events.JobEvent{
+ Event: events.EventPushError,
+ ID: j[i].Ident,
+ Pipeline: ppl.Name(),
+ Driver: ppl.Driver(),
+ Start: start,
+ Elapsed: time.Since(start),
+ Error: err,
+ })
return errors.E(op, err)
}
@@ -576,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) {
case events.EventPipePaused:
p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobStart:
- p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobOK:
- p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushOK:
p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPushError:
- p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventJobError:
- p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
+ p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeActive:
p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed)
case events.EventPipeStopped: