summaryrefslogtreecommitdiff
path: root/plugins/metrics/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-07 23:19:18 +0300
committerValery Piashchynski <[email protected]>2021-06-07 23:19:18 +0300
commit54a0a61485831c1848d55f65e2fb21c057ac6729 (patch)
treeeee1c293f46cd65ec628da3a1b557cd142f3eabd /plugins/metrics/plugin.go
parent0a48df5e5e44aaefa503669d7fb75490f1be103f (diff)
- Add metrics exporter for the workers
- Add metrics for the request duration and request count - Add tests Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/metrics/plugin.go')
-rw-r--r--plugins/metrics/plugin.go91
1 files changed, 52 insertions, 39 deletions
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index 474bb21d..1f65e313 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -30,62 +30,81 @@ type Plugin struct {
http *http.Server
collectors sync.Map // all receivers are pointers
registry *prometheus.Registry
+
+ // prometheus Collectors
+ statProviders []StatProvider
}
// Init service.
-func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("metrics_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &m.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- m.cfg.InitDefaults()
+ p.cfg.InitDefaults()
- m.log = log
- m.registry = prometheus.NewRegistry()
+ p.log = log
+ p.registry = prometheus.NewRegistry()
// Default
- err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
if err != nil {
return errors.E(op, err)
}
// Default
- err = m.registry.Register(prometheus.NewGoCollector())
+ err = p.registry.Register(prometheus.NewGoCollector())
if err != nil {
return errors.E(op, err)
}
- collectors, err := m.cfg.getCollectors()
+ collectors, err := p.cfg.getCollectors()
if err != nil {
return errors.E(op, err)
}
// Register invocation will be later in the Serve method
for k, v := range collectors {
- m.collectors.Store(k, v)
+ p.collectors.Store(k, v)
}
+
+ p.statProviders = make([]StatProvider, 0, 2)
+
return nil
}
// Register new prometheus collector.
-func (m *Plugin) Register(c prometheus.Collector) error {
- return m.registry.Register(c)
+func (p *Plugin) Register(c prometheus.Collector) error {
+ return p.registry.Register(c)
}
// Serve prometheus metrics service.
-func (m *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
- m.collectors.Range(func(key, value interface{}) bool {
+
+ // register Collected stat providers
+ for i := 0; i < len(p.statProviders); i++ {
+ sp := p.statProviders[i]
+ for _, c := range sp.MetricsCollector() {
+ err := p.registry.Register(c)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+ }
+
+ p.collectors.Range(func(key, value interface{}) bool {
// key - name
// value - prometheus.Collector
c := value.(prometheus.Collector)
- if err := m.registry.Register(c); err != nil {
+ if err := p.registry.Register(c); err != nil {
errCh <- err
return false
}
@@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error {
DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
- m.http = &http.Server{
- Addr: m.cfg.Address,
- Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
+ p.http = &http.Server{
+ Addr: p.cfg.Address,
+ Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}),
IdleTimeout: time.Hour * 24,
ReadTimeout: time.Minute * 60,
MaxHeaderBytes: maxHeaderSize,
@@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error {
}
go func() {
- err := m.http.ListenAndServe()
+ err := p.http.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
errCh <- err
return
@@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error {
}
// Stop prometheus metrics service.
-func (m *Plugin) Stop() error {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (p *Plugin) Stop() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
- if m.http != nil {
+ if p.http != nil {
// timeout is 10 seconds
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
- err := m.http.Shutdown(ctx)
+ err := p.http.Shutdown(ctx)
if err != nil {
// Function should be Stop() error
- m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
}
}
return nil
}
// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named)
-func (m *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- m.AddStatProvider,
+ p.AddStatProvider,
}
}
// AddStatProvider adds a metrics provider
-func (m *Plugin) AddStatProvider(stat StatProvider) error {
- for _, c := range stat.MetricsCollector() {
- err := m.registry.Register(c)
- if err != nil {
- return err
- }
- }
+func (p *Plugin) AddStatProvider(stat StatProvider) error {
+ p.statProviders = append(p.statProviders, stat)
return nil
}
// Name returns user friendly plugin name
-func (m *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// RPC interface satisfaction
-func (m *Plugin) RPC() interface{} {
+func (p *Plugin) RPC() interface{} {
return &rpcServer{
- svc: m,
- log: m.log,
+ svc: p,
+ log: p.log,
}
}
// Available interface implementation
-func (m *Plugin) Available() {
-}
+func (p *Plugin) Available() {}