From 54a0a61485831c1848d55f65e2fb21c057ac6729 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 7 Jun 2021 23:19:18 +0300 Subject: - Add metrics exporter for the workers - Add metrics for the request duration and request count - Add tests Signed-off-by: Valery Piashchynski --- plugins/metrics/plugin.go | 91 +++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 39 deletions(-) (limited to 'plugins/metrics/plugin.go') diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index 474bb21d..1f65e313 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -30,62 +30,81 @@ type Plugin struct { http *http.Server collectors sync.Map // all receivers are pointers registry *prometheus.Registry + + // prometheus Collectors + statProviders []StatProvider } // Init service. -func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("metrics_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &m.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - m.cfg.InitDefaults() + p.cfg.InitDefaults() - m.log = log - m.registry = prometheus.NewRegistry() + p.log = log + p.registry = prometheus.NewRegistry() // Default - err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) if err != nil { return errors.E(op, err) } // Default - err = m.registry.Register(prometheus.NewGoCollector()) + err = p.registry.Register(prometheus.NewGoCollector()) if err != nil { return errors.E(op, err) } - collectors, err := m.cfg.getCollectors() + collectors, err := p.cfg.getCollectors() if err != nil { return errors.E(op, err) } // Register invocation will be later in the Serve method for k, v := range collectors { - m.collectors.Store(k, v) + p.collectors.Store(k, v) } + + p.statProviders = make([]StatProvider, 0, 2) + return nil } // Register new prometheus collector. -func (m *Plugin) Register(c prometheus.Collector) error { - return m.registry.Register(c) +func (p *Plugin) Register(c prometheus.Collector) error { + return p.registry.Register(c) } // Serve prometheus metrics service. -func (m *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) - m.collectors.Range(func(key, value interface{}) bool { + + // register Collected stat providers + for i := 0; i < len(p.statProviders); i++ { + sp := p.statProviders[i] + for _, c := range sp.MetricsCollector() { + err := p.registry.Register(c) + if err != nil { + errCh <- err + return errCh + } + } + } + + p.collectors.Range(func(key, value interface{}) bool { // key - name // value - prometheus.Collector c := value.(prometheus.Collector) - if err := m.registry.Register(c); err != nil { + if err := p.registry.Register(c); err != nil { errCh <- err return false } @@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error { DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - m.http = &http.Server{ - Addr: m.cfg.Address, - Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + p.http = &http.Server{ + Addr: p.cfg.Address, + Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}), IdleTimeout: time.Hour * 24, ReadTimeout: time.Minute * 60, MaxHeaderBytes: maxHeaderSize, @@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error { } go func() { - err := m.http.ListenAndServe() + err := p.http.ListenAndServe() if err != nil && err != http.ErrServerClosed { errCh <- err return @@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error { } // Stop prometheus metrics service. -func (m *Plugin) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() - if m.http != nil { + if p.http != nil { // timeout is 10 seconds ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - err := m.http.Shutdown(ctx) + err := p.http.Shutdown(ctx) if err != nil { // Function should be Stop() error - m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) + p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) } } return nil } // Collects used to collect all plugins which implement metrics.StatProvider interface (and Named) -func (m *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - m.AddStatProvider, + p.AddStatProvider, } } // AddStatProvider adds a metrics provider -func (m *Plugin) AddStatProvider(stat StatProvider) error { - for _, c := range stat.MetricsCollector() { - err := m.registry.Register(c) - if err != nil { - return err - } - } +func (p *Plugin) AddStatProvider(stat StatProvider) error { + p.statProviders = append(p.statProviders, stat) return nil } // Name returns user friendly plugin name -func (m *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // RPC interface satisfaction -func (m *Plugin) RPC() interface{} { +func (p *Plugin) RPC() interface{} { return &rpcServer{ - svc: m, - log: m.log, + svc: p, + log: p.log, } } // Available interface implementation -func (m *Plugin) Available() { -} +func (p *Plugin) Available() {} -- cgit v1.2.3