summaryrefslogtreecommitdiff
path: root/plugins/jobs/metrics.go
blob: 38d0bcfb9da9e6c59454d52a206b5b0a0dd73148 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package jobs

import (
	"sync/atomic"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/spiral/roadrunner/v2/pkg/events"
	"github.com/spiral/roadrunner/v2/plugins/informer"
)

func (p *Plugin) MetricsCollector() []prometheus.Collector {
	// p - implements Exporter interface (workers)
	// other - request duration and count
	return []prometheus.Collector{p.statsExporter}
}

const (
	namespace = "rr_jobs"
)

type statsExporter struct {
	workers       informer.Informer
	workersMemory uint64
	jobsOk        uint64
	pushOk        uint64
	jobsErr       uint64
	pushErr       uint64
}

var (
	worker  = prometheus.NewDesc("workers_memory_bytes", "Memory usage by JOBS workers.", nil, nil)
	pushOk  = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_ok"), "Number of job push.", nil, nil)
	pushErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "push_err"), "Number of jobs push which was failed.", nil, nil)
	jobsErr = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_err"), "Number of jobs error while processing in the worker.", nil, nil)
	jobsOk  = prometheus.NewDesc(prometheus.BuildFQName(namespace, "", "jobs_ok"), "Number of successfully processed jobs.", nil, nil)
)

func newStatsExporter(stats informer.Informer) *statsExporter {
	return &statsExporter{
		workers:       stats,
		workersMemory: 0,
		jobsOk:        0,
		pushOk:        0,
		jobsErr:       0,
		pushErr:       0,
	}
}

func (se *statsExporter) metricsCallback(event interface{}) {
	if jev, ok := event.(events.JobEvent); ok {
		switch jev.Event { //nolint:exhaustive
		case events.EventJobOK:
			atomic.AddUint64(&se.jobsOk, 1)
		case events.EventPushOK:
			atomic.AddUint64(&se.pushOk, 1)
		case events.EventPushError:
			atomic.AddUint64(&se.pushErr, 1)
		case events.EventJobError:
			atomic.AddUint64(&se.jobsErr, 1)
		}
	}
}

func (se *statsExporter) Describe(d chan<- *prometheus.Desc) {
	// send description
	d <- worker
	d <- pushErr
	d <- pushOk
	d <- jobsErr
	d <- jobsOk
}

func (se *statsExporter) Collect(ch chan<- prometheus.Metric) {
	// get the copy of the processes
	workers := se.workers.Workers()

	// cumulative RSS memory in bytes
	var cum uint64

	// collect the memory
	for i := 0; i < len(workers); i++ {
		cum += workers[i].MemoryUsage
	}

	// send the values to the prometheus
	ch <- prometheus.MustNewConstMetric(worker, prometheus.GaugeValue, float64(cum))
	// send the values to the prometheus
	ch <- prometheus.MustNewConstMetric(jobsOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsOk)))
	ch <- prometheus.MustNewConstMetric(jobsErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.jobsErr)))
	ch <- prometheus.MustNewConstMetric(pushOk, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushOk)))
	ch <- prometheus.MustNewConstMetric(pushErr, prometheus.GaugeValue, float64(atomic.LoadUint64(&se.pushErr)))
}