summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-07 23:19:18 +0300
committerValery Piashchynski <[email protected]>2021-06-07 23:19:18 +0300
commit54a0a61485831c1848d55f65e2fb21c057ac6729 (patch)
treeeee1c293f46cd65ec628da3a1b557cd142f3eabd /plugins
parent0a48df5e5e44aaefa503669d7fb75490f1be103f (diff)
- Add metrics exporter for the workers
- Add metrics for the request duration and request count - Add tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/http/metrics.go92
-rw-r--r--plugins/http/plugin.go15
-rw-r--r--plugins/metrics/plugin.go91
3 files changed, 156 insertions, 42 deletions
diff --git a/plugins/http/metrics.go b/plugins/http/metrics.go
new file mode 100644
index 00000000..d7a9110b
--- /dev/null
+++ b/plugins/http/metrics.go
@@ -0,0 +1,92 @@
+package http
+
+import (
+ "strconv"
+
+ "github.com/prometheus/client_golang/prometheus"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+)
+
+func (p *Plugin) MetricsCollector() []prometheus.Collector {
+ // p - implements Exporter interface (workers)
+ // other - request duration and count
+ return []prometheus.Collector{p, p.requestsExporter.requestDuration, p.requestsExporter.requestCounter}
+}
+
+func (p *Plugin) metricsCallback(event interface{}) {
+ switch e := event.(type) {
+ case handler.ResponseEvent:
+ p.requestsExporter.requestCounter.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Inc()
+
+ p.requestsExporter.requestDuration.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Observe(e.Elapsed().Seconds())
+ case handler.ErrorEvent:
+ p.requestsExporter.requestCounter.With(prometheus.Labels{
+ "status": "500",
+ }).Inc()
+
+ p.requestsExporter.requestDuration.With(prometheus.Labels{
+ "status": "500",
+ }).Observe(e.Elapsed().Seconds())
+ }
+}
+
+type workersExporter struct {
+ wm *prometheus.Desc
+ workersMemory uint64
+}
+
+func newWorkersExporter() *workersExporter {
+ return &workersExporter{
+ wm: prometheus.NewDesc("rr_http_workers_memory_bytes", "Memory usage by HTTP workers.", nil, nil),
+ workersMemory: 0,
+ }
+}
+
+func (p *Plugin) Describe(d chan<- *prometheus.Desc) {
+ // send description
+ d <- p.workersExporter.wm
+}
+
+func (p *Plugin) Collect(ch chan<- prometheus.Metric) {
+ // get the copy of the processes
+ workers := p.Workers()
+
+ // cumulative RSS memory in bytes
+ var cum uint64
+
+ // collect the memory
+ for i := 0; i < len(workers); i++ {
+ cum += workers[i].MemoryUsage
+ }
+
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(p.workersExporter.wm, prometheus.GaugeValue, float64(cum))
+}
+
+type requestsExporter struct {
+ requestCounter *prometheus.CounterVec
+ requestDuration *prometheus.HistogramVec
+}
+
+func newRequestsExporter() *requestsExporter {
+ return &requestsExporter{
+ requestCounter: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "rr_http_request_total",
+ Help: "Total number of handled http requests after server restart.",
+ },
+ []string{"status"},
+ ),
+ requestDuration: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "rr_http_request_duration_seconds",
+ Help: "HTTP request duration.",
+ },
+ []string{"status"},
+ ),
+ }
+}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index ba83344a..bec01ac3 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -62,6 +62,10 @@ type Plugin struct {
// servers RR handler
handler *handler.Handler
+ // metrics
+ workersExporter *workersExporter
+ requestsExporter *requestsExporter
+
// servers
http *http.Server
https *http.Server
@@ -102,6 +106,11 @@ func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server serv
p.cfg.Env = make(map[string]string)
}
+ // initialize workersExporter
+ p.workersExporter = newWorkersExporter()
+ // initialize requests exporter
+ p.requestsExporter = newRequestsExporter()
+
p.cfg.Env[RrMode] = "http"
p.server = server
@@ -159,7 +168,7 @@ func (p *Plugin) serve(errCh chan error) {
return
}
- p.handler.AddListener(p.logCallback)
+ p.handler.AddListener(p.logCallback, p.metricsCallback)
if p.cfg.EnableHTTP() {
if p.cfg.EnableH2C() {
@@ -341,7 +350,7 @@ func (p *Plugin) Reset() error {
}
p.log.Info("HTTP handler listeners successfully re-added")
- p.handler.AddListener(p.logCallback)
+ p.handler.AddListener(p.logCallback, p.metricsCallback)
p.log.Info("HTTP plugin successfully restarted")
return nil
@@ -386,7 +395,7 @@ func (p *Plugin) Ready() status.Status {
workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin'p worker pool is ready
+ // we assume, that plugin's worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index 474bb21d..1f65e313 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -30,62 +30,81 @@ type Plugin struct {
http *http.Server
collectors sync.Map // all receivers are pointers
registry *prometheus.Registry
+
+ // prometheus Collectors
+ statProviders []StatProvider
}
// Init service.
-func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("metrics_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &m.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- m.cfg.InitDefaults()
+ p.cfg.InitDefaults()
- m.log = log
- m.registry = prometheus.NewRegistry()
+ p.log = log
+ p.registry = prometheus.NewRegistry()
// Default
- err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
if err != nil {
return errors.E(op, err)
}
// Default
- err = m.registry.Register(prometheus.NewGoCollector())
+ err = p.registry.Register(prometheus.NewGoCollector())
if err != nil {
return errors.E(op, err)
}
- collectors, err := m.cfg.getCollectors()
+ collectors, err := p.cfg.getCollectors()
if err != nil {
return errors.E(op, err)
}
// Register invocation will be later in the Serve method
for k, v := range collectors {
- m.collectors.Store(k, v)
+ p.collectors.Store(k, v)
}
+
+ p.statProviders = make([]StatProvider, 0, 2)
+
return nil
}
// Register new prometheus collector.
-func (m *Plugin) Register(c prometheus.Collector) error {
- return m.registry.Register(c)
+func (p *Plugin) Register(c prometheus.Collector) error {
+ return p.registry.Register(c)
}
// Serve prometheus metrics service.
-func (m *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
- m.collectors.Range(func(key, value interface{}) bool {
+
+ // register Collected stat providers
+ for i := 0; i < len(p.statProviders); i++ {
+ sp := p.statProviders[i]
+ for _, c := range sp.MetricsCollector() {
+ err := p.registry.Register(c)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+ }
+
+ p.collectors.Range(func(key, value interface{}) bool {
// key - name
// value - prometheus.Collector
c := value.(prometheus.Collector)
- if err := m.registry.Register(c); err != nil {
+ if err := p.registry.Register(c); err != nil {
errCh <- err
return false
}
@@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error {
DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
- m.http = &http.Server{
- Addr: m.cfg.Address,
- Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
+ p.http = &http.Server{
+ Addr: p.cfg.Address,
+ Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}),
IdleTimeout: time.Hour * 24,
ReadTimeout: time.Minute * 60,
MaxHeaderBytes: maxHeaderSize,
@@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error {
}
go func() {
- err := m.http.ListenAndServe()
+ err := p.http.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
errCh <- err
return
@@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error {
}
// Stop prometheus metrics service.
-func (m *Plugin) Stop() error {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (p *Plugin) Stop() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
- if m.http != nil {
+ if p.http != nil {
// timeout is 10 seconds
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
- err := m.http.Shutdown(ctx)
+ err := p.http.Shutdown(ctx)
if err != nil {
// Function should be Stop() error
- m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
}
}
return nil
}
// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named)
-func (m *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- m.AddStatProvider,
+ p.AddStatProvider,
}
}
// AddStatProvider adds a metrics provider
-func (m *Plugin) AddStatProvider(stat StatProvider) error {
- for _, c := range stat.MetricsCollector() {
- err := m.registry.Register(c)
- if err != nil {
- return err
- }
- }
+func (p *Plugin) AddStatProvider(stat StatProvider) error {
+ p.statProviders = append(p.statProviders, stat)
return nil
}
// Name returns user friendly plugin name
-func (m *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// RPC interface satisfaction
-func (m *Plugin) RPC() interface{} {
+func (p *Plugin) RPC() interface{} {
return &rpcServer{
- svc: m,
- log: m.log,
+ svc: p,
+ log: p.log,
}
}
// Available interface implementation
-func (m *Plugin) Available() {
-}
+func (p *Plugin) Available() {}