summaryrefslogtreecommitdiff
path: root/plugins/metrics
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-26 00:47:21 +0300
committerGitHub <[email protected]>2020-12-26 00:47:21 +0300
commit566d7f4c95eb5dedcb2da5afcda4bbea8eba077f (patch)
tree0007a6b8c8ac9e7d31b8a5f3f7f27669c860d261 /plugins/metrics
parent1bc3db2ea9b95edd0101676d7bfd75df3782c3bd (diff)
parent7a0dee1a416705c621edbf50e1f43fb39845348f (diff)
Merge pull request #463 from spiral/experiment/core_pluginsv2.0.0-beta1
[RR2] Plugins
Diffstat (limited to 'plugins/metrics')
-rw-r--r--plugins/metrics/config.go138
-rw-r--r--plugins/metrics/config_test.go89
-rw-r--r--plugins/metrics/doc.go1
-rw-r--r--plugins/metrics/interface.go7
-rw-r--r--plugins/metrics/plugin.go229
-rw-r--r--plugins/metrics/rpc.go294
6 files changed, 758 insertions, 0 deletions
diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go
new file mode 100644
index 00000000..9459bc9b
--- /dev/null
+++ b/plugins/metrics/config.go
@@ -0,0 +1,138 @@
+package metrics
+
+import (
+ "fmt"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// Config configures metrics service.
+type Config struct {
+ // Address to listen
+ Address string
+
+ // Collect define application specific metrics.
+ Collect map[string]Collector
+}
+
+type NamedCollector struct {
+ // Name of the collector
+ Name string `json:"name"`
+
+ // Collector structure
+ Collector `json:"collector"`
+}
+
+// CollectorType represents prometheus collector types
+type CollectorType string
+
+const (
+ // Histogram type
+ Histogram CollectorType = "histogram"
+
+ // Gauge type
+ Gauge CollectorType = "gauge"
+
+ // Counter type
+ Counter CollectorType = "counter"
+
+ // Summary type
+ Summary CollectorType = "summary"
+)
+
+// Collector describes single application specific metric.
+type Collector struct {
+ // Namespace of the metric.
+ Namespace string `json:"namespace"`
+ // Subsystem of the metric.
+ Subsystem string `json:"subsystem"`
+ // Collector type (histogram, gauge, counter, summary).
+ Type CollectorType `json:"type"`
+ // Help of collector.
+ Help string `json:"help"`
+ // Labels for vectorized metrics.
+ Labels []string `json:"labels"`
+ // Buckets for histogram metric.
+ Buckets []float64 `json:"buckets"`
+ // Objectives for the summary opts
+ Objectives map[float64]float64 `json:"objectives"`
+}
+
+// register application specific metrics.
+func (c *Config) getCollectors() (map[string]prometheus.Collector, error) {
+ if c.Collect == nil {
+ return nil, nil
+ }
+
+ collectors := make(map[string]prometheus.Collector)
+
+ for name, m := range c.Collect {
+ var collector prometheus.Collector
+ switch m.Type {
+ case Histogram:
+ opts := prometheus.HistogramOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Buckets: m.Buckets,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case Gauge:
+ opts := prometheus.GaugeOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case Counter:
+ opts := prometheus.CounterOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case Summary:
+ opts := prometheus.SummaryOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Objectives: m.Objectives,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+ default:
+ return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name)
+ }
+
+ collectors[name] = collector
+ }
+
+ return collectors, nil
+}
+
+func (c *Config) InitDefaults() {
+
+}
diff --git a/plugins/metrics/config_test.go b/plugins/metrics/config_test.go
new file mode 100644
index 00000000..665ec9cd
--- /dev/null
+++ b/plugins/metrics/config_test.go
@@ -0,0 +1,89 @@
+package metrics
+
+import (
+ "bytes"
+ "testing"
+
+ j "github.com/json-iterator/go"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+)
+
+var json = j.ConfigCompatibleWithStandardLibrary
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := `{"request": {"From": "Something"}}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := `{"dir": "/dir/"`
+ c := &Config{}
+
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ assert.Error(t, err)
+}
+
+func Test_Config_Metrics(t *testing.T) {
+ cfg := `{
+"collect":{
+ "metric1":{"type": "gauge"},
+ "metric2":{ "type": "counter"},
+ "metric3":{"type": "summary"},
+ "metric4":{"type": "histogram"}
+}
+}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := c.getCollectors()
+ assert.NoError(t, err)
+
+ assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"])
+ assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"])
+ assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"])
+ assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"])
+}
+
+func Test_Config_MetricsVector(t *testing.T) {
+ cfg := `{
+"collect":{
+ "metric1":{"type": "gauge","labels":["label"]},
+ "metric2":{ "type": "counter","labels":["label"]},
+ "metric3":{"type": "summary","labels":["label"]},
+ "metric4":{"type": "histogram","labels":["label"]}
+}
+}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := c.getCollectors()
+ assert.NoError(t, err)
+
+ assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"])
+ assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"])
+ assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"])
+ assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"])
+}
diff --git a/plugins/metrics/doc.go b/plugins/metrics/doc.go
new file mode 100644
index 00000000..1abe097a
--- /dev/null
+++ b/plugins/metrics/doc.go
@@ -0,0 +1 @@
+package metrics
diff --git a/plugins/metrics/interface.go b/plugins/metrics/interface.go
new file mode 100644
index 00000000..87ba4017
--- /dev/null
+++ b/plugins/metrics/interface.go
@@ -0,0 +1,7 @@
+package metrics
+
+import "github.com/prometheus/client_golang/prometheus"
+
+type StatProvider interface {
+ MetricsCollector() []prometheus.Collector
+}
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
new file mode 100644
index 00000000..fb9096a1
--- /dev/null
+++ b/plugins/metrics/plugin.go
@@ -0,0 +1,229 @@
+package metrics
+
+import (
+ "context"
+ "crypto/tls"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "golang.org/x/sys/cpu"
+)
+
+const (
+ // PluginName declares plugin name.
+ PluginName = "metrics"
+ // maxHeaderSize declares max header size for prometheus server
+ maxHeaderSize = 1024 * 1024 * 100 // 104MB
+)
+
+type statsProvider struct {
+ collectors []prometheus.Collector
+ name string
+}
+
+// Plugin to manage application metrics using Prometheus.
+type Plugin struct {
+ cfg Config
+ log logger.Logger
+ mu sync.Mutex // all receivers are pointers
+ http *http.Server
+ collectors sync.Map // all receivers are pointers
+ registry *prometheus.Registry
+}
+
+// Init service.
+func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("metrics init")
+ err := cfg.UnmarshalKey(PluginName, &m.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ // TODO figure out what is Init
+ m.cfg.InitDefaults()
+
+ m.log = log
+ m.registry = prometheus.NewRegistry()
+
+ // Default
+ err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Default
+ err = m.registry.Register(prometheus.NewGoCollector())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ collectors, err := m.cfg.getCollectors()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Register invocation will be later in the Serve method
+ for k, v := range collectors {
+ m.collectors.Store(k, statsProvider{
+ collectors: []prometheus.Collector{v},
+ name: k,
+ })
+ }
+ return nil
+}
+
+// Register new prometheus collector.
+func (m *Plugin) Register(c prometheus.Collector) error {
+ return m.registry.Register(c)
+}
+
+// Serve prometheus metrics service.
+func (m *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ m.collectors.Range(func(key, value interface{}) bool {
+ // key - name
+ // value - statsProvider struct
+ c := value.(statsProvider)
+ for _, v := range c.collectors {
+ if err := m.registry.Register(v); err != nil {
+ errCh <- err
+ return false
+ }
+ }
+
+ return true
+ })
+
+ var topCipherSuites []uint16
+ var defaultCipherSuitesTLS13 []uint16
+
+ hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ
+ hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
+ // Keep in sync with crypto/aes/cipher_s390x.go.
+ hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM)
+
+ hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
+
+ if hasGCMAsm {
+ // If AES-GCM hardware is provided then prioritise AES-GCM
+ // cipher suites.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ } else {
+ // Without AES-GCM hardware, we put the ChaCha20-Poly1305
+ // cipher suites first.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ }
+
+ DefaultCipherSuites := make([]uint16, 0, 22)
+ DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
+ DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
+
+ m.http = &http.Server{
+ Addr: m.cfg.Address,
+ Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
+ IdleTimeout: time.Hour * 24,
+ ReadTimeout: time.Minute * 60,
+ MaxHeaderBytes: maxHeaderSize,
+ ReadHeaderTimeout: time.Minute * 60,
+ WriteTimeout: time.Minute * 60,
+ TLSConfig: &tls.Config{
+ CurvePreferences: []tls.CurveID{
+ tls.CurveP256,
+ tls.CurveP384,
+ tls.CurveP521,
+ tls.X25519,
+ },
+ CipherSuites: DefaultCipherSuites,
+ MinVersion: tls.VersionTLS12,
+ PreferServerCipherSuites: true,
+ },
+ }
+
+ go func() {
+ err := m.http.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ errCh <- err
+ return
+ }
+ }()
+
+ return errCh
+}
+
+// Stop prometheus metrics service.
+func (m *Plugin) Stop() error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.http != nil {
+ // timeout is 10 seconds
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ err := m.http.Shutdown(ctx)
+ if err != nil {
+ // Function should be Stop() error
+ m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ }
+ }
+ return nil
+}
+
+// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named)
+func (m *Plugin) Collects() []interface{} {
+ return []interface{}{
+ m.AddStatProvider,
+ }
+}
+
+// Collector returns application specific collector by name or nil if collector not found.
+func (m *Plugin) AddStatProvider(name endure.Named, stat StatProvider) error {
+ m.collectors.Store(name.Name(), statsProvider{
+ collectors: stat.MetricsCollector(),
+ name: name.Name(),
+ })
+ return nil
+}
+
+// RPC interface satisfaction
+func (m *Plugin) Name() string {
+ return PluginName
+}
+
+// RPC interface satisfaction
+func (m *Plugin) RPC() interface{} {
+ return &rpcServer{
+ svc: m,
+ log: m.log,
+ }
+}
diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go
new file mode 100644
index 00000000..f9c6accb
--- /dev/null
+++ b/plugins/metrics/rpc.go
@@ -0,0 +1,294 @@
+package metrics
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpcServer struct {
+ svc *Plugin
+ log logger.Logger
+}
+
+// Metric represent single metric produced by the application.
+type Metric struct {
+ // Collector name.
+ Name string
+
+ // Collector value.
+ Value float64
+
+ // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values.
+ Labels []string
+}
+
+// Add new metric to the designated collector.
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ const op = errors.Op("Add metric")
+ rpc.log.Info("Adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector %s, try first Declare the desired collector", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Add(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Add(m.Value)
+ case prometheus.Counter:
+ c.Add(m.Value)
+
+ case *prometheus.CounterVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Add(m.Value)
+
+ default:
+ return errors.E(op, errors.Errorf("collector %s does not support method `Add`", m.Name))
+ }
+
+ // RPC, set ok to true as return value. Need by rpc.Call reply argument
+ *ok = true
+ rpc.log.Info("new metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value)
+ return nil
+}
+
+// Sub subtract the value from the specific metric (gauge only).
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ const op = errors.Op("Subtracting metric")
+ rpc.log.Info("Subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ // can it be nil ??? I guess can't
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Sub(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector, but none was provided", "name", m.Name, "value", m.Value)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Sub(m.Value)
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name))
+ }
+ rpc.log.Info("Subtracting operation applied successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}
+
+// Observe the value (histogram and summary only).
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ const op = errors.Op("Observe metrics")
+ rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ observer, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ observer.Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ observer, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ observer.Observe(m.Value)
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name))
+ }
+
+ rpc.log.Info("observe operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}
+
+// Declare is used to register new collector in prometheus
+// THE TYPES ARE:
+// NamedCollector -> Collector with the name
+// bool -> RPC reply value
+// RETURNS:
+// error
+func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error {
+ const op = errors.Op("Declare metric")
+ rpc.log.Info("Declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+ _, exist := rpc.svc.collectors.Load(nc.Name)
+ if exist {
+ rpc.log.Error("metric with provided name already exist", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+ return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name))
+ }
+
+ var collector prometheus.Collector
+ switch nc.Type {
+ case Histogram:
+ opts := prometheus.HistogramOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ Buckets: nc.Buckets,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case Gauge:
+ opts := prometheus.GaugeOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case Counter:
+ opts := prometheus.CounterOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case Summary:
+ opts := prometheus.SummaryOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+
+ default:
+ return errors.E(op, errors.Errorf("unknown collector type %s", nc.Type))
+ }
+
+ // add collector to sync.Map
+ rpc.svc.collectors.Store(nc.Name, collector)
+ // that method might panic, we handle it by recover
+ err := rpc.svc.Register(collector)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+
+ rpc.log.Info("metric successfully added", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+
+ *ok = true
+ return nil
+}
+
+// Set the metric value (only for gaude).
+func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
+ const op = errors.Op("Set metric")
+ rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Set(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Set(m.Value)
+
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method Set", m.Name))
+ }
+
+ rpc.log.Info("set operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}