diff options
author | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-08-18 16:13:49 +0300 |
commit | c35fbff05205330ab8e49f6008fdbd59128cee14 (patch) | |
tree | d3eb03e8db7231d97ae4ff1d60a0c5a50db8a6fb /plugins | |
parent | 1d092e57afb55a01283b41942ca3ef15a7e4bdef (diff) |
Add prometheus metrics for the jobs, update tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/jobs/drivers/amqp/consumer.go | 7 | ||||
-rw-r--r-- | plugins/jobs/drivers/beanstalk/consumer.go | 5 | ||||
-rw-r--r-- | plugins/jobs/drivers/ephemeral/consumer.go | 5 | ||||
-rw-r--r-- | plugins/jobs/drivers/sqs/consumer.go | 5 | ||||
-rw-r--r-- | plugins/jobs/metrics.go | 91 | ||||
-rw-r--r-- | plugins/jobs/plugin.go | 94 | ||||
-rw-r--r-- | plugins/jobs/rpc.go | 1 |
7 files changed, 199 insertions, 9 deletions
diff --git a/plugins/jobs/drivers/amqp/consumer.go b/plugins/jobs/drivers/amqp/consumer.go index f3b40ae3..95df02ec 100644 --- a/plugins/jobs/drivers/amqp/consumer.go +++ b/plugins/jobs/drivers/amqp/consumer.go @@ -307,7 +307,8 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Driver: pipe.Driver(), Queue: q.Name, Active: int64(q.Messages), - Delayed: *j.delayed, + Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), }, nil case <-ctx.Done(): @@ -501,3 +502,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return errors.E(op, errors.TimeOut, ctx.Err()) } } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/beanstalk/consumer.go b/plugins/jobs/drivers/beanstalk/consumer.go index 6a178dff..6323148b 100644 --- a/plugins/jobs/drivers/beanstalk/consumer.go +++ b/plugins/jobs/drivers/beanstalk/consumer.go @@ -235,6 +235,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: j.tName, + Ready: ready(atomic.LoadUint32(&j.listeners)), } // set stat, skip errors (replace with 0) @@ -353,3 +354,7 @@ func (j *JobConsumer) Resume(_ context.Context, p string) { Start: time.Now(), }) } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/ephemeral/consumer.go b/plugins/jobs/drivers/ephemeral/consumer.go index 34778642..f0992cd6 100644 --- a/plugins/jobs/drivers/ephemeral/consumer.go +++ b/plugins/jobs/drivers/ephemeral/consumer.go @@ -113,6 +113,7 @@ func (j *JobConsumer) State(_ context.Context) (*jobState.State, error) { Queue: pipe.Name(), Active: atomic.LoadInt64(j.active), Delayed: atomic.LoadInt64(j.delayed), + Ready: ready(atomic.LoadUint32(&j.listeners)), }, nil } @@ -267,3 +268,7 @@ func (j *JobConsumer) consume() { } }() } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/drivers/sqs/consumer.go b/plugins/jobs/drivers/sqs/consumer.go index d957b3eb..5d419b51 100644 --- a/plugins/jobs/drivers/sqs/consumer.go +++ b/plugins/jobs/drivers/sqs/consumer.go @@ -285,6 +285,7 @@ func (j *JobConsumer) State(ctx context.Context) (*jobState.State, error) { Pipeline: pipe.Name(), Driver: pipe.Driver(), Queue: *j.queueURL, + Ready: ready(atomic.LoadUint32(&j.listeners)), } nom, err := strconv.Atoi(attr.Attributes[string(types.QueueAttributeNameApproximateNumberOfMessages)]) @@ -418,3 +419,7 @@ func (j *JobConsumer) handleItem(ctx context.Context, msg *Item) error { return nil } + +func ready(r uint32) bool { + return r > 0 +} diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go new file mode 100644 index 00000000..61856a10 --- /dev/null +++ b/plugins/jobs/metrics.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/informer" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p.statsExporter} +} + +const ( + namespace = "rr_jobs" +) + +type statsExporter struct { + workers informer.Informer + workersMemory uint64 + jobsOk uint64 + pushOk uint64 + jobsErr uint64 + pushErr uint64 +} + +var ( + worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) + pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) + pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) + jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) + jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) +) + +func newStatsExporter(stats informer.Informer) *statsExporter { + return &statsExporter{ + workers: stats, + workersMemory: 0, + jobsOk: 0, + pushOk: 0, + jobsErr: 0, + pushErr: 0, + } +} + +func (se *statsExporter) metricsCallback(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { //nolint:exhaustive + case events.EventJobOK: + atomic.AddUint64(&se.jobsOk, 1) + case events.EventPushOK: + atomic.AddUint64(&se.pushOk, 1) + case events.EventPushError: + atomic.AddUint64(&se.pushErr, 1) + case events.EventJobError: + atomic.AddUint64(&se.jobsErr, 1) + } + } +} + +func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { + // send description + d <- pushErr + d <- pushOk + d <- jobsErr + d <- jobsOk +} + +func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := se.workers.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) + ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) + ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) + ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) +} diff --git a/plugins/jobs/plugin.go b/plugins/jobs/plugin.go index 5f6c8b94..5e62c5c5 100644 --- a/plugins/jobs/plugin.go +++ b/plugins/jobs/plugin.go @@ -59,7 +59,8 @@ type Plugin struct { stopCh chan struct{} // internal payloads pool - pldPool sync.Pool + pldPool sync.Pool + statsExporter *statsExporter } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { @@ -105,6 +106,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.queue = priorityqueue.NewBinHeap(p.cfg.PipelineSize) p.log = log + // metrics + p.statsExporter = newStatsExporter(p) + p.events.AddListener(p.statsExporter.metricsCallback) + return nil } @@ -202,8 +207,23 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit 5. Pipeline name */ + start := time.Now() + p.events.Push(events.JobEvent{ + Event: events.EventJobStart, + ID: jb.ID(), + Start: start, + Elapsed: 0, + }) + ctx, err := jb.Context() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) + errNack := jb.Nack() if errNack != nil { p.log.Error("negatively acknowledge failed", "error", errNack) @@ -220,6 +240,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit resp, err := p.workersPool.Exec(exec) p.RUnlock() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // RR protocol level error, Nack the job errNack := jb.Nack() if errNack != nil { @@ -237,14 +263,33 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.putPayload(exec) err = jb.Ack() if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.log.Error("acknowledge error, job might be missed", "error", err) continue } + + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) } // handle the response protocol err = handleResponse(resp.Body, jb, p.log) if err != nil { + p.events.Push(events.JobEvent{ + Event: events.EventJobError, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) p.putPayload(exec) errNack := jb.Nack() if errNack != nil { @@ -256,6 +301,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } + p.events.Push(events.JobEvent{ + Event: events.EventJobOK, + ID: jb.ID(), + Start: start, + Elapsed: time.Since(start), + }) // return payload p.putPayload(exec) } @@ -359,7 +410,7 @@ func (p *Plugin) Reset() error { p.workersPool = nil var err error - p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents) + p.workersPool, err = p.server.NewWorkerPool(context.Background(), p.cfg.Pool, map[string]string{RrMode: RrModeJobs}, p.collectJobsEvents, p.statsExporter.metricsCallback) if err != nil { return errors.E(op, err) } @@ -372,6 +423,7 @@ func (p *Plugin) Reset() error { func (p *Plugin) Push(j *job.Job) error { const op = errors.Op("jobs_plugin_push") + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j.Options.Pipeline) if !ok { @@ -397,11 +449,27 @@ func (p *Plugin) Push(j *job.Job) error { err := d.Push(ctx, j) if err != nil { - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } - cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushOK, + ID: j.Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return nil } @@ -410,6 +478,7 @@ func (p *Plugin) PushBatch(j []*job.Job) error { const op = errors.Op("jobs_plugin_push") for i := 0; i < len(j); i++ { + start := time.Now() // get the pipeline for the job pipe, ok := p.pipelines.Load(j[i].Options.Pipeline) if !ok { @@ -432,6 +501,15 @@ func (p *Plugin) PushBatch(j []*job.Job) error { err := d.Push(ctx, j[i]) if err != nil { cancel() + p.events.Push(events.JobEvent{ + Event: events.EventPushError, + ID: j[i].Ident, + Pipeline: ppl.Name(), + Driver: ppl.Driver(), + Start: start, + Elapsed: time.Since(start), + Error: err, + }) return errors.E(op, err) } @@ -576,15 +654,15 @@ func (p *Plugin) collectJobsEvents(event interface{}) { case events.EventPipePaused: p.log.Info("pipeline paused", "pipeline", jev.Pipeline, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobStart: - p.log.Info("job started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processing started", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobOK: - p.log.Info("job OK", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Info("job processed without errors", "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushOK: p.log.Info("job pushed to the queue", "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPushError: - p.log.Error("job push error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job push error, job might be lost", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventJobError: - p.log.Error("job error", "error", jev.Error, "pipeline", jev.Pipeline, "ID", jev.ID, "Driver", jev.Driver, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) + p.log.Error("job processed with errors", "error", jev.Error, "ID", jev.ID, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeActive: p.log.Info("pipeline active", "pipeline", jev.Pipeline, "start", jev.Start.UTC(), "elapsed", jev.Elapsed) case events.EventPipeStopped: diff --git a/plugins/jobs/rpc.go b/plugins/jobs/rpc.go index 2750cd8f..94f903d5 100644 --- a/plugins/jobs/rpc.go +++ b/plugins/jobs/rpc.go @@ -129,6 +129,7 @@ func (r *rpc) Stat(_ *jobsv1beta.Empty, resp *jobsv1beta.Stats) error { Active: state[i].Active, Delayed: state[i].Delayed, Reserved: state[i].Reserved, + Ready: state[i].Ready, }) } |