diff options
Diffstat (limited to 'plugins/jobs/metrics.go')
-rw-r--r-- | plugins/jobs/metrics.go | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/plugins/jobs/metrics.go b/plugins/jobs/metrics.go new file mode 100644 index 00000000..61856a10 --- /dev/null +++ b/plugins/jobs/metrics.go @@ -0,0 +1,91 @@ +package jobs + +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/pkg/events" + "github.com/spiral/roadrunner/v2/plugins/informer" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p.statsExporter} +} + +const ( + namespace = "rr_jobs" +) + +type statsExporter struct { + workers informer.Informer + workersMemory uint64 + jobsOk uint64 + pushOk uint64 + jobsErr uint64 + pushErr uint64 +} + +var ( + worker = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil) + pushOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil) + pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil) + jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil) + jobsOk = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil) +) + +func newStatsExporter(stats informer.Informer) *statsExporter { + return &statsExporter{ + workers: stats, + workersMemory: 0, + jobsOk: 0, + pushOk: 0, + jobsErr: 0, + pushErr: 0, + } +} + +func (se *statsExporter) metricsCallback(event interface{}) { + if jev, ok := event.(events.JobEvent); ok { + switch jev.Event { //nolint:exhaustive + case events.EventJobOK: + atomic.AddUint64(&se.jobsOk, 1) + case events.EventPushOK: + atomic.AddUint64(&se.pushOk, 1) + case events.EventPushError: + atomic.AddUint64(&se.pushErr, 1) + case events.EventJobError: + atomic.AddUint64(&se.jobsErr, 1) + } + } +} + +func (se *statsExporter) Describe(d chan<- *prometheus.Desc) { + // send description + d <- pushErr + d <- pushOk + d <- jobsErr + d <- jobsOk +} + +func (se *statsExporter) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := se.workers.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum)) + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk))) + ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr))) + ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk))) + ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr))) +} |