diff options
author | Valery Piashchynski <[email protected]> | 2020-12-21 19:42:23 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-21 19:42:23 +0300 |
commit | ee8b4075c0f836d698d1ae505c87c17147de447a (patch) | |
tree | 531d980e5bfb94ee39b03952a97e0445f7955409 /plugins/metrics | |
parent | 0ad45031047bb479e06ce0a0f496c6db9b2641c9 (diff) |
Move plugins to the roadrunner-plugins repository
Diffstat (limited to 'plugins/metrics')
-rw-r--r-- | plugins/metrics/config.go | 138 | ||||
-rw-r--r-- | plugins/metrics/config_test.go | 89 | ||||
-rw-r--r-- | plugins/metrics/doc.go | 1 | ||||
-rw-r--r-- | plugins/metrics/plugin.go | 230 | ||||
-rw-r--r-- | plugins/metrics/rpc.go | 294 | ||||
-rw-r--r-- | plugins/metrics/tests/.rr-test.yaml | 16 | ||||
-rw-r--r-- | plugins/metrics/tests/docker-compose.yml | 7 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 739 | ||||
-rw-r--r-- | plugins/metrics/tests/plugin1.go | 46 |
9 files changed, 0 insertions, 1560 deletions
diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go deleted file mode 100644 index 9459bc9b..00000000 --- a/plugins/metrics/config.go +++ /dev/null @@ -1,138 +0,0 @@ -package metrics - -import ( - "fmt" - - "github.com/prometheus/client_golang/prometheus" -) - -// Config configures metrics service. -type Config struct { - // Address to listen - Address string - - // Collect define application specific metrics. - Collect map[string]Collector -} - -type NamedCollector struct { - // Name of the collector - Name string `json:"name"` - - // Collector structure - Collector `json:"collector"` -} - -// CollectorType represents prometheus collector types -type CollectorType string - -const ( - // Histogram type - Histogram CollectorType = "histogram" - - // Gauge type - Gauge CollectorType = "gauge" - - // Counter type - Counter CollectorType = "counter" - - // Summary type - Summary CollectorType = "summary" -) - -// Collector describes single application specific metric. -type Collector struct { - // Namespace of the metric. - Namespace string `json:"namespace"` - // Subsystem of the metric. - Subsystem string `json:"subsystem"` - // Collector type (histogram, gauge, counter, summary). - Type CollectorType `json:"type"` - // Help of collector. - Help string `json:"help"` - // Labels for vectorized metrics. - Labels []string `json:"labels"` - // Buckets for histogram metric. - Buckets []float64 `json:"buckets"` - // Objectives for the summary opts - Objectives map[float64]float64 `json:"objectives"` -} - -// register application specific metrics. -func (c *Config) getCollectors() (map[string]prometheus.Collector, error) { - if c.Collect == nil { - return nil, nil - } - - collectors := make(map[string]prometheus.Collector) - - for name, m := range c.Collect { - var collector prometheus.Collector - switch m.Type { - case Histogram: - opts := prometheus.HistogramOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - Buckets: m.Buckets, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewHistogramVec(opts, m.Labels) - } else { - collector = prometheus.NewHistogram(opts) - } - case Gauge: - opts := prometheus.GaugeOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewGaugeVec(opts, m.Labels) - } else { - collector = prometheus.NewGauge(opts) - } - case Counter: - opts := prometheus.CounterOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewCounterVec(opts, m.Labels) - } else { - collector = prometheus.NewCounter(opts) - } - case Summary: - opts := prometheus.SummaryOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - Objectives: m.Objectives, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewSummaryVec(opts, m.Labels) - } else { - collector = prometheus.NewSummary(opts) - } - default: - return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) - } - - collectors[name] = collector - } - - return collectors, nil -} - -func (c *Config) InitDefaults() { - -} diff --git a/plugins/metrics/config_test.go b/plugins/metrics/config_test.go deleted file mode 100644 index 665ec9cd..00000000 --- a/plugins/metrics/config_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package metrics - -import ( - "bytes" - "testing" - - j "github.com/json-iterator/go" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" -) - -var json = j.ConfigCompatibleWithStandardLibrary - -func Test_Config_Hydrate_Error1(t *testing.T) { - cfg := `{"request": {"From": "Something"}}` - c := &Config{} - f := new(bytes.Buffer) - f.WriteString(cfg) - - err := json.Unmarshal(f.Bytes(), &c) - if err != nil { - t.Fatal(err) - } -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := `{"dir": "/dir/"` - c := &Config{} - - f := new(bytes.Buffer) - f.WriteString(cfg) - - err := json.Unmarshal(f.Bytes(), &c) - assert.Error(t, err) -} - -func Test_Config_Metrics(t *testing.T) { - cfg := `{ -"collect":{ - "metric1":{"type": "gauge"}, - "metric2":{ "type": "counter"}, - "metric3":{"type": "summary"}, - "metric4":{"type": "histogram"} -} -}` - c := &Config{} - f := new(bytes.Buffer) - f.WriteString(cfg) - - err := json.Unmarshal(f.Bytes(), &c) - if err != nil { - t.Fatal(err) - } - - m, err := c.getCollectors() - assert.NoError(t, err) - - assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"]) - assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"]) - assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"]) - assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"]) -} - -func Test_Config_MetricsVector(t *testing.T) { - cfg := `{ -"collect":{ - "metric1":{"type": "gauge","labels":["label"]}, - "metric2":{ "type": "counter","labels":["label"]}, - "metric3":{"type": "summary","labels":["label"]}, - "metric4":{"type": "histogram","labels":["label"]} -} -}` - c := &Config{} - f := new(bytes.Buffer) - f.WriteString(cfg) - - err := json.Unmarshal(f.Bytes(), &c) - if err != nil { - t.Fatal(err) - } - - m, err := c.getCollectors() - assert.NoError(t, err) - - assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"]) - assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"]) - assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"]) - assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"]) -} diff --git a/plugins/metrics/doc.go b/plugins/metrics/doc.go deleted file mode 100644 index 1abe097a..00000000 --- a/plugins/metrics/doc.go +++ /dev/null @@ -1 +0,0 @@ -package metrics diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go deleted file mode 100644 index 1f3ca005..00000000 --- a/plugins/metrics/plugin.go +++ /dev/null @@ -1,230 +0,0 @@ -package metrics - -import ( - "context" - "crypto/tls" - "net/http" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/spiral/endure" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/config" - "github.com/spiral/roadrunner/v2/interfaces/log" - "github.com/spiral/roadrunner/v2/interfaces/metrics" - "golang.org/x/sys/cpu" -) - -const ( - // PluginName declares plugin name. - PluginName = "metrics" - // maxHeaderSize declares max header size for prometheus server - maxHeaderSize = 1024 * 1024 * 100 // 104MB -) - -type statsProvider struct { - collectors []prometheus.Collector - name string -} - -// Plugin to manage application metrics using Prometheus. -type Plugin struct { - cfg Config - log log.Logger - mu sync.Mutex // all receivers are pointers - http *http.Server - collectors sync.Map // all receivers are pointers - registry *prometheus.Registry -} - -// Init service. -func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error { - const op = errors.Op("Metrics Init") - err := cfg.UnmarshalKey(PluginName, &m.cfg) - if err != nil { - return err - } - - // TODO figure out what is Init - m.cfg.InitDefaults() - - m.log = log - m.registry = prometheus.NewRegistry() - - // Default - err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - if err != nil { - return errors.E(op, err) - } - - // Default - err = m.registry.Register(prometheus.NewGoCollector()) - if err != nil { - return errors.E(op, err) - } - - collectors, err := m.cfg.getCollectors() - if err != nil { - return errors.E(op, err) - } - - // Register invocation will be later in the Serve method - for k, v := range collectors { - m.collectors.Store(k, statsProvider{ - collectors: []prometheus.Collector{v}, - name: k, - }) - } - return nil -} - -// Register new prometheus collector. -func (m *Plugin) Register(c prometheus.Collector) error { - return m.registry.Register(c) -} - -// Serve prometheus metrics service. -func (m *Plugin) Serve() chan error { - errCh := make(chan error, 1) - m.collectors.Range(func(key, value interface{}) bool { - // key - name - // value - statsProvider struct - c := value.(statsProvider) - for _, v := range c.collectors { - if err := m.registry.Register(v); err != nil { - errCh <- err - return false - } - } - - return true - }) - - var topCipherSuites []uint16 - var defaultCipherSuitesTLS13 []uint16 - - hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ - hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL - // Keep in sync with crypto/aes/cipher_s390x.go. - hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) - - hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X - - if hasGCMAsm { - // If AES-GCM hardware is provided then prioritise AES-GCM - // cipher suites. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } else { - // Without AES-GCM hardware, we put the ChaCha20-Poly1305 - // cipher suites first. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } - - DefaultCipherSuites := make([]uint16, 0, 22) - DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) - DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - - m.http = &http.Server{ - Addr: m.cfg.Address, - Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), - IdleTimeout: time.Hour * 24, - ReadTimeout: time.Minute * 60, - MaxHeaderBytes: maxHeaderSize, - ReadHeaderTimeout: time.Minute * 60, - WriteTimeout: time.Minute * 60, - TLSConfig: &tls.Config{ - CurvePreferences: []tls.CurveID{ - tls.CurveP256, - tls.CurveP384, - tls.CurveP521, - tls.X25519, - }, - CipherSuites: DefaultCipherSuites, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - }, - } - - go func() { - err := m.http.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - errCh <- err - return - } - }() - - return errCh -} - -// Stop prometheus metrics service. -func (m *Plugin) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() - - if m.http != nil { - // timeout is 10 seconds - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - err := m.http.Shutdown(ctx) - if err != nil { - // Function should be Stop() error - m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) - } - } - return nil -} - -// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named) -func (m *Plugin) Collects() []interface{} { - return []interface{}{ - m.AddStatProvider, - } -} - -// Collector returns application specific collector by name or nil if collector not found. -func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error { - m.collectors.Store(name.Name(), statsProvider{ - collectors: stat.MetricsCollector(), - name: name.Name(), - }) - return nil -} - -// RPC interface satisfaction -func (m *Plugin) Name() string { - return PluginName -} - -// RPC interface satisfaction -func (m *Plugin) RPC() interface{} { - return &rpcServer{ - svc: m, - log: m.log, - } -} diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go deleted file mode 100644 index b8897098..00000000 --- a/plugins/metrics/rpc.go +++ /dev/null @@ -1,294 +0,0 @@ -package metrics - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/interfaces/log" -) - -type rpcServer struct { - svc *Plugin - log log.Logger -} - -// Metric represent single metric produced by the application. -type Metric struct { - // Collector name. - Name string - - // Collector value. - Value float64 - - // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values. - Labels []string -} - -// Add new metric to the designated collector. -func (rpc *rpcServer) Add(m *Metric, ok *bool) error { - const op = errors.Op("Add metric") - rpc.log.Info("Adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels) - c, exist := rpc.svc.collectors.Load(m.Name) - if !exist { - rpc.log.Error("undefined collector", "collector", m.Name) - return errors.E(op, errors.Errorf("undefined collector %s, try first Declare the desired collector", m.Name)) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Add(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - rpc.log.Error("required labels for collector", "collector", m.Name) - return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) - } - - gauge, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) - return errors.E(op, err) - } - gauge.Add(m.Value) - case prometheus.Counter: - c.Add(m.Value) - - case *prometheus.CounterVec: - if len(m.Labels) == 0 { - return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) - } - - gauge, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) - return errors.E(op, err) - } - gauge.Add(m.Value) - - default: - return errors.E(op, errors.Errorf("collector %s does not support method `Add`", m.Name)) - } - - // RPC, set ok to true as return value. Need by rpc.Call reply argument - *ok = true - rpc.log.Info("new metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value) - return nil -} - -// Sub subtract the value from the specific metric (gauge only). -func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { - const op = errors.Op("Subtracting metric") - rpc.log.Info("Subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels) - c, exist := rpc.svc.collectors.Load(m.Name) - if !exist { - rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels) - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - if c == nil { - // can it be nil ??? I guess can't - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Sub(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - rpc.log.Error("required labels for collector, but none was provided", "name", m.Name, "value", m.Value) - return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) - } - - gauge, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) - return errors.E(op, err) - } - gauge.Sub(m.Value) - default: - return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name)) - } - rpc.log.Info("Subtracting operation applied successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) - - *ok = true - return nil -} - -// Observe the value (histogram and summary only). -func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { - const op = errors.Op("Observe metrics") - rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) - - c, exist := rpc.svc.collectors.Load(m.Name) - if !exist { - rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels) - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - if c == nil { - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - - switch c := c.(type) { - case *prometheus.SummaryVec: - if len(m.Labels) == 0 { - return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) - } - - observer, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - return errors.E(op, err) - } - observer.Observe(m.Value) - - case prometheus.Histogram: - c.Observe(m.Value) - - case *prometheus.HistogramVec: - if len(m.Labels) == 0 { - return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) - } - - observer, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) - return errors.E(op, err) - } - observer.Observe(m.Value) - default: - return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name)) - } - - rpc.log.Info("observe operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) - - *ok = true - return nil -} - -// Declare is used to register new collector in prometheus -// THE TYPES ARE: -// NamedCollector -> Collector with the name -// bool -> RPC reply value -// RETURNS: -// error -func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error { - const op = errors.Op("Declare metric") - rpc.log.Info("Declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) - _, exist := rpc.svc.collectors.Load(nc.Name) - if exist { - rpc.log.Error("metric with provided name already exist", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) - return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name)) - } - - var collector prometheus.Collector - switch nc.Type { - case Histogram: - opts := prometheus.HistogramOpts{ - Name: nc.Name, - Namespace: nc.Namespace, - Subsystem: nc.Subsystem, - Help: nc.Help, - Buckets: nc.Buckets, - } - - if len(nc.Labels) != 0 { - collector = prometheus.NewHistogramVec(opts, nc.Labels) - } else { - collector = prometheus.NewHistogram(opts) - } - case Gauge: - opts := prometheus.GaugeOpts{ - Name: nc.Name, - Namespace: nc.Namespace, - Subsystem: nc.Subsystem, - Help: nc.Help, - } - - if len(nc.Labels) != 0 { - collector = prometheus.NewGaugeVec(opts, nc.Labels) - } else { - collector = prometheus.NewGauge(opts) - } - case Counter: - opts := prometheus.CounterOpts{ - Name: nc.Name, - Namespace: nc.Namespace, - Subsystem: nc.Subsystem, - Help: nc.Help, - } - - if len(nc.Labels) != 0 { - collector = prometheus.NewCounterVec(opts, nc.Labels) - } else { - collector = prometheus.NewCounter(opts) - } - case Summary: - opts := prometheus.SummaryOpts{ - Name: nc.Name, - Namespace: nc.Namespace, - Subsystem: nc.Subsystem, - Help: nc.Help, - } - - if len(nc.Labels) != 0 { - collector = prometheus.NewSummaryVec(opts, nc.Labels) - } else { - collector = prometheus.NewSummary(opts) - } - - default: - return errors.E(op, errors.Errorf("unknown collector type %s", nc.Type)) - } - - // add collector to sync.Map - rpc.svc.collectors.Store(nc.Name, collector) - // that method might panic, we handle it by recover - err := rpc.svc.Register(collector) - if err != nil { - *ok = false - return errors.E(op, err) - } - - rpc.log.Info("metric successfully added", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) - - *ok = true - return nil -} - -// Set the metric value (only for gaude). -func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { - const op = errors.Op("Set metric") - rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) - - c, exist := rpc.svc.collectors.Load(m.Name) - if !exist { - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - if c == nil { - return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Set(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - rpc.log.Error("required labels for collector", "collector", m.Name) - return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) - } - - gauge, err := c.GetMetricWithLabelValues(m.Labels...) - if err != nil { - rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) - return errors.E(op, err) - } - gauge.Set(m.Value) - - default: - return errors.E(op, errors.Errorf("collector `%s` does not support method Set", m.Name)) - } - - rpc.log.Info("set operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) - - *ok = true - return nil -} diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml deleted file mode 100644 index 37c50395..00000000 --- a/plugins/metrics/tests/.rr-test.yaml +++ /dev/null @@ -1,16 +0,0 @@ -rpc: - listen: tcp://127.0.0.1:6001 - disabled: false - -metrics: - # prometheus client address (path /metrics added automatically) - address: localhost:2112 - collect: - app_metric: - type: histogram - help: "Custom application metric" - labels: [ "type" ] - buckets: [ 0.1, 0.2, 0.3, 1.0 ] -logs: - mode: development - level: error
\ No newline at end of file diff --git a/plugins/metrics/tests/docker-compose.yml b/plugins/metrics/tests/docker-compose.yml deleted file mode 100644 index 610633b4..00000000 --- a/plugins/metrics/tests/docker-compose.yml +++ /dev/null @@ -1,7 +0,0 @@ -version: '3.7' - -services: - prometheus: - image: prom/prometheus - ports: - - 9090:9090 diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go deleted file mode 100644 index 4572bc3f..00000000 --- a/plugins/metrics/tests/metrics_test.go +++ /dev/null @@ -1,739 +0,0 @@ -package tests - -import ( - "io/ioutil" - "net" - "net/http" - "net/rpc" - "os" - "os/signal" - "syscall" - "testing" - "time" - - "github.com/spiral/endure" - goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/metrics" - rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" - "github.com/stretchr/testify/assert" -) - -const dialAddr = "127.0.0.1:6001" -const dialNetwork = "tcp" -const getAddr = "http://localhost:2112/metrics" - -// get request and return body -func get() (string, error) { - r, err := http.Get(getAddr) - if err != nil { - return "", err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return "", err - } - - err = r.Body.Close() - if err != nil { - return "", err - } - // unsafe - return string(b), err -} - -func TestMetricsInit(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - cfg := &config.Viper{} - cfg.Prefix = "rr" - cfg.Path = ".rr-test.yaml" - - err = cont.RegisterAll( - cfg, - &metrics.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - tt := time.NewTimer(time.Second * 5) - - out, err := get() - assert.NoError(t, err) - - assert.Contains(t, out, "go_gc_duration_seconds") - - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } -} - -func TestMetricsGaugeCollector(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - cfg := &config.Viper{} - cfg.Prefix = "rr" - cfg.Path = ".rr-test.yaml" - - err = cont.RegisterAll( - cfg, - &metrics.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - &Plugin1{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - time.Sleep(time.Second) - tt := time.NewTimer(time.Second * 5) - - out, err := get() - assert.NoError(t, err) - assert.Contains(t, out, "my_gauge 100") - assert.Contains(t, out, "my_gauge2 100") - - out, err = get() - assert.NoError(t, err) - assert.Contains(t, out, "go_gc_duration_seconds") - - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } -} - -func TestMetricsDifferentRPCCalls(t *testing.T) { - cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) - if err != nil { - t.Fatal(err) - } - - cfg := &config.Viper{} - cfg.Prefix = "rr" - cfg.Path = ".rr-test.yaml" - - err = cont.RegisterAll( - cfg, - &metrics.Plugin{}, - &rpcPlugin.Plugin{}, - &logger.ZapLogger{}, - ) - assert.NoError(t, err) - - err = cont.Init() - if err != nil { - t.Fatal(err) - } - - ch, err := cont.Serve() - assert.NoError(t, err) - - sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - - go func() { - tt := time.NewTimer(time.Minute * 3) - for { - select { - case e := <-ch: - assert.Fail(t, "error", e.Error.Error()) - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - case <-sig: - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - case <-tt.C: - // timeout - err = cont.Stop() - if err != nil { - assert.FailNow(t, "error", err.Error()) - } - return - } - } - }() - - t.Run("DeclareMetric", declareMetricsTest) - genericOut, err := get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "test_metrics_named_collector") - - t.Run("AddMetric", addMetricsTest) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "test_metrics_named_collector 10000") - - t.Run("SetMetric", setMetric) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "user_gauge_collector 100") - - t.Run("VectorMetric", vectorMetric) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "gauge_2_collector{section=\"first\",type=\"core\"} 100") - - t.Run("MissingSection", missingSection) - t.Run("SetWithoutLabels", setWithoutLabels) - t.Run("SetOnHistogram", setOnHistogram) - t.Run("MetricSub", subMetric) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "sub_gauge_subMetric 1") - - t.Run("SubVector", subVector) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "sub_gauge_subVector{section=\"first\",type=\"core\"} 1") - - t.Run("RegisterHistogram", registerHistogram) - - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, `TYPE histogram_registerHistogram`) - - // check buckets - assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.1"} 0`) - assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.2"} 0`) - assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.5"} 0`) - assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="+Inf"} 0`) - assert.Contains(t, genericOut, `histogram_registerHistogram_sum 0`) - assert.Contains(t, genericOut, `histogram_registerHistogram_count 0`) - - t.Run("CounterMetric", counterMetric) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "HELP default_default_counter_CounterMetric test_counter") - assert.Contains(t, genericOut, `default_default_counter_CounterMetric{section="section2",type="type2"}`) - - t.Run("ObserveMetric", observeMetric) - genericOut, err = get() - assert.NoError(t, err) - assert.Contains(t, genericOut, "observe_observeMetric") - - t.Run("ObserveMetricNotEnoughLabels", observeMetricNotEnoughLabels) - - close(sig) -} - -func observeMetricNotEnoughLabels(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "observe_observeMetricNotEnoughLabels", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Help: "test_observe", - Type: metrics.Histogram, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - assert.Error(t, client.Call("metrics.Observe", metrics.Metric{ - Name: "observe_observeMetric", - Value: 100.0, - Labels: []string{"test"}, - }, &ret)) - assert.False(t, ret) -} - -func observeMetric(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "observe_observeMetric", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Help: "test_observe", - Type: metrics.Histogram, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - assert.NoError(t, client.Call("metrics.Observe", metrics.Metric{ - Name: "observe_observeMetric", - Value: 100.0, - Labels: []string{"test", "test2"}, - }, &ret)) - assert.True(t, ret) -} - -func counterMetric(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "counter_CounterMetric", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Help: "test_counter", - Type: metrics.Counter, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - assert.NoError(t, client.Call("metrics.Add", metrics.Metric{ - Name: "counter_CounterMetric", - Value: 100.0, - Labels: []string{"type2", "section2"}, - }, &ret)) - assert.True(t, ret) -} - -func registerHistogram(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "histogram_registerHistogram", - Collector: metrics.Collector{ - Help: "test_histogram", - Type: metrics.Histogram, - Buckets: []float64{0.1, 0.2, 0.5}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - m := metrics.Metric{ - Name: "histogram_registerHistogram", - Value: 10000, - Labels: nil, - } - - err = client.Call("metrics.Add", m, &ret) - assert.Error(t, err) - assert.False(t, ret) -} - -func subVector(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "sub_gauge_subVector", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - m := metrics.Metric{ - Name: "sub_gauge_subVector", - Value: 100000, - Labels: []string{"core", "first"}, - } - - err = client.Call("metrics.Add", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - m = metrics.Metric{ - Name: "sub_gauge_subVector", - Value: 99999, - Labels: []string{"core", "first"}, - } - - err = client.Call("metrics.Sub", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} - -func subMetric(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "sub_gauge_subMetric", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - m := metrics.Metric{ - Name: "sub_gauge_subMetric", - Value: 100000, - } - - err = client.Call("metrics.Add", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - m = metrics.Metric{ - Name: "sub_gauge_subMetric", - Value: 99999, - } - - err = client.Call("metrics.Sub", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} - -func setOnHistogram(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "histogram_setOnHistogram", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Histogram, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - m := metrics.Metric{ - Name: "gauge_setOnHistogram", - Value: 100.0, - } - - err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} - assert.Error(t, err) - assert.False(t, ret) -} - -func setWithoutLabels(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "gauge_setWithoutLabels", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - m := metrics.Metric{ - Name: "gauge_setWithoutLabels", - Value: 100.0, - } - - err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} - assert.Error(t, err) - assert.False(t, ret) -} - -func missingSection(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "gauge_missing_section_collector", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - m := metrics.Metric{ - Name: "gauge_missing_section_collector", - Value: 100.0, - Labels: []string{"missing"}, - } - - err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} - assert.Error(t, err) - assert.False(t, ret) -} - -func vectorMetric(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "gauge_2_collector", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - Labels: []string{"type", "section"}, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - - ret = false - - m := metrics.Metric{ - Name: "gauge_2_collector", - Value: 100.0, - Labels: []string{"core", "first"}, - } - - err = client.Call("metrics.Set", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} - -func setMetric(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "user_gauge_collector", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Gauge, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) - ret = false - - m := metrics.Metric{ - Name: "user_gauge_collector", - Value: 100.0, - } - - err = client.Call("metrics.Set", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} - -func addMetricsTest(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - m := metrics.Metric{ - Name: "test_metrics_named_collector", - Value: 10000, - Labels: nil, - } - - err = client.Call("metrics.Add", m, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} - -func declareMetricsTest(t *testing.T) { - conn, err := net.Dial(dialNetwork, dialAddr) - assert.NoError(t, err) - defer func() { - _ = conn.Close() - }() - client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) - var ret bool - - nc := metrics.NamedCollector{ - Name: "test_metrics_named_collector", - Collector: metrics.Collector{ - Namespace: "default", - Subsystem: "default", - Type: metrics.Counter, - Help: "NO HELP!", - Labels: nil, - Buckets: nil, - }, - } - - err = client.Call("metrics.Declare", nc, &ret) - assert.NoError(t, err) - assert.True(t, ret) -} diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go deleted file mode 100644 index 7a5e5e6c..00000000 --- a/plugins/metrics/tests/plugin1.go +++ /dev/null @@ -1,46 +0,0 @@ -package tests - -import ( - "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/roadrunner/v2/interfaces/config" -) - -// Gauge ////////////// -type Plugin1 struct { - config config.Configurer -} - -func (p1 *Plugin1) Init(cfg config.Configurer) error { - p1.config = cfg - return nil -} - -func (p1 *Plugin1) Serve() chan error { - errCh := make(chan error, 1) - return errCh -} - -func (p1 *Plugin1) Stop() error { - return nil -} - -func (p1 *Plugin1) Name() string { - return "metrics_test.plugin1" -} - -func (p1 *Plugin1) MetricsCollector() []prometheus.Collector { - collector := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "my_gauge", - Help: "My gauge value", - }) - - collector.Set(100) - - collector2 := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "my_gauge2", - Help: "My gauge2 value", - }) - - collector2.Set(100) - return []prometheus.Collector{collector, collector2} -} |