diff options
author | Valery Piashchynski <[email protected]> | 2021-06-07 23:19:18 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-07 23:19:18 +0300 |
commit | 54a0a61485831c1848d55f65e2fb21c057ac6729 (patch) | |
tree | eee1c293f46cd65ec628da3a1b557cd142f3eabd /plugins | |
parent | 0a48df5e5e44aaefa503669d7fb75490f1be103f (diff) |
- Add metrics exporter for the workers
- Add metrics for the request duration and request count
- Add tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/http/metrics.go | 92 | ||||
-rw-r--r-- | plugins/http/plugin.go | 15 | ||||
-rw-r--r-- | plugins/metrics/plugin.go | 91 |
3 files changed, 156 insertions, 42 deletions
diff --git a/plugins/http/metrics.go b/plugins/http/metrics.go new file mode 100644 index 00000000..d7a9110b --- /dev/null +++ b/plugins/http/metrics.go @@ -0,0 +1,92 @@ +package http + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p, p.requestsExporter.requestDuration, p.requestsExporter.requestCounter} +} + +func (p *Plugin) metricsCallback(event interface{}) { + switch e := event.(type) { + case handler.ResponseEvent: + p.requestsExporter.requestCounter.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Inc() + + p.requestsExporter.requestDuration.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Observe(e.Elapsed().Seconds()) + case handler.ErrorEvent: + p.requestsExporter.requestCounter.With(prometheus.Labels{ + "status": "500", + }).Inc() + + p.requestsExporter.requestDuration.With(prometheus.Labels{ + "status": "500", + }).Observe(e.Elapsed().Seconds()) + } +} + +type workersExporter struct { + wm *prometheus.Desc + workersMemory uint64 +} + +func newWorkersExporter() *workersExporter { + return &workersExporter{ + wm: prometheus.NewDesc("rr_http_workers_memory_bytes", "Memory usage by HTTP workers.", nil, nil), + workersMemory: 0, + } +} + +func (p *Plugin) Describe(d chan<- *prometheus.Desc) { + // send description + d <- p.workersExporter.wm +} + +func (p *Plugin) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := p.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(p.workersExporter.wm, prometheus.GaugeValue, float64(cum)) +} + +type requestsExporter struct { + requestCounter *prometheus.CounterVec + requestDuration *prometheus.HistogramVec +} + +func newRequestsExporter() *requestsExporter { + return &requestsExporter{ + requestCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "rr_http_request_total", + Help: "Total number of handled http requests after server restart.", + }, + []string{"status"}, + ), + requestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rr_http_request_duration_seconds", + Help: "HTTP request duration.", + }, + []string{"status"}, + ), + } +} diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index ba83344a..bec01ac3 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -62,6 +62,10 @@ type Plugin struct { // servers RR handler handler *handler.Handler + // metrics + workersExporter *workersExporter + requestsExporter *requestsExporter + // servers http *http.Server https *http.Server @@ -102,6 +106,11 @@ func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server serv p.cfg.Env = make(map[string]string) } + // initialize workersExporter + p.workersExporter = newWorkersExporter() + // initialize requests exporter + p.requestsExporter = newRequestsExporter() + p.cfg.Env[RrMode] = "http" p.server = server @@ -159,7 +168,7 @@ func (p *Plugin) serve(errCh chan error) { return } - p.handler.AddListener(p.logCallback) + p.handler.AddListener(p.logCallback, p.metricsCallback) if p.cfg.EnableHTTP() { if p.cfg.EnableH2C() { @@ -341,7 +350,7 @@ func (p *Plugin) Reset() error { } p.log.Info("HTTP handler listeners successfully re-added") - p.handler.AddListener(p.logCallback) + p.handler.AddListener(p.logCallback, p.metricsCallback) p.log.Info("HTTP plugin successfully restarted") return nil @@ -386,7 +395,7 @@ func (p *Plugin) Ready() status.Status { workers := p.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) - // we assume, that plugin'p worker pool is ready + // we assume, that plugin's worker pool is ready if workers[i].State().Value() == worker.StateReady { return status.Status{ Code: http.StatusOK, diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index 474bb21d..1f65e313 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -30,62 +30,81 @@ type Plugin struct { http *http.Server collectors sync.Map // all receivers are pointers registry *prometheus.Registry + + // prometheus Collectors + statProviders []StatProvider } // Init service. -func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("metrics_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &m.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - m.cfg.InitDefaults() + p.cfg.InitDefaults() - m.log = log - m.registry = prometheus.NewRegistry() + p.log = log + p.registry = prometheus.NewRegistry() // Default - err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) if err != nil { return errors.E(op, err) } // Default - err = m.registry.Register(prometheus.NewGoCollector()) + err = p.registry.Register(prometheus.NewGoCollector()) if err != nil { return errors.E(op, err) } - collectors, err := m.cfg.getCollectors() + collectors, err := p.cfg.getCollectors() if err != nil { return errors.E(op, err) } // Register invocation will be later in the Serve method for k, v := range collectors { - m.collectors.Store(k, v) + p.collectors.Store(k, v) } + + p.statProviders = make([]StatProvider, 0, 2) + return nil } // Register new prometheus collector. -func (m *Plugin) Register(c prometheus.Collector) error { - return m.registry.Register(c) +func (p *Plugin) Register(c prometheus.Collector) error { + return p.registry.Register(c) } // Serve prometheus metrics service. -func (m *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) - m.collectors.Range(func(key, value interface{}) bool { + + // register Collected stat providers + for i := 0; i < len(p.statProviders); i++ { + sp := p.statProviders[i] + for _, c := range sp.MetricsCollector() { + err := p.registry.Register(c) + if err != nil { + errCh <- err + return errCh + } + } + } + + p.collectors.Range(func(key, value interface{}) bool { // key - name // value - prometheus.Collector c := value.(prometheus.Collector) - if err := m.registry.Register(c); err != nil { + if err := p.registry.Register(c); err != nil { errCh <- err return false } @@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error { DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - m.http = &http.Server{ - Addr: m.cfg.Address, - Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + p.http = &http.Server{ + Addr: p.cfg.Address, + Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}), IdleTimeout: time.Hour * 24, ReadTimeout: time.Minute * 60, MaxHeaderBytes: maxHeaderSize, @@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error { } go func() { - err := m.http.ListenAndServe() + err := p.http.ListenAndServe() if err != nil && err != http.ErrServerClosed { errCh <- err return @@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error { } // Stop prometheus metrics service. -func (m *Plugin) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() - if m.http != nil { + if p.http != nil { // timeout is 10 seconds ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - err := m.http.Shutdown(ctx) + err := p.http.Shutdown(ctx) if err != nil { // Function should be Stop() error - m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) + p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) } } return nil } // Collects used to collect all plugins which implement metrics.StatProvider interface (and Named) -func (m *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - m.AddStatProvider, + p.AddStatProvider, } } // AddStatProvider adds a metrics provider -func (m *Plugin) AddStatProvider(stat StatProvider) error { - for _, c := range stat.MetricsCollector() { - err := m.registry.Register(c) - if err != nil { - return err - } - } +func (p *Plugin) AddStatProvider(stat StatProvider) error { + p.statProviders = append(p.statProviders, stat) return nil } // Name returns user friendly plugin name -func (m *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // RPC interface satisfaction -func (m *Plugin) RPC() interface{} { +func (p *Plugin) RPC() interface{} { return &rpcServer{ - svc: m, - log: m.log, + svc: p, + log: p.log, } } // Available interface implementation -func (m *Plugin) Available() { -} +func (p *Plugin) Available() {} |