diff options
Diffstat (limited to 'service/metrics')
-rw-r--r-- | service/metrics/config.go | 136 | ||||
-rw-r--r-- | service/metrics/config_test.go | 75 | ||||
-rw-r--r-- | service/metrics/rpc.go | 260 | ||||
-rw-r--r-- | service/metrics/rpc_test.go | 861 | ||||
-rw-r--r-- | service/metrics/service.go | 191 | ||||
-rw-r--r-- | service/metrics/service_test.go | 247 |
6 files changed, 0 insertions, 1770 deletions
diff --git a/service/metrics/config.go b/service/metrics/config.go deleted file mode 100644 index c95fd940..00000000 --- a/service/metrics/config.go +++ /dev/null @@ -1,136 +0,0 @@ -package metrics - -import ( - "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/roadrunner/service" -) - -// Config configures metrics service. -type Config struct { - // Address to listen - Address string - - // Collect define application specific metrics. - Collect map[string]Collector -} - -type NamedCollector struct { - // Name of the collector - Name string `json:"name"` - - // Collector structure - Collector `json:"collector"` -} - -// CollectorType represents prometheus collector types -type CollectorType string - -const ( - // Histogram type - Histogram CollectorType = "histogram" - - // Gauge type - Gauge CollectorType = "gauge" - - // Counter type - Counter CollectorType = "counter" - - // Summary type - Summary CollectorType = "summary" -) - -// Collector describes single application specific metric. -type Collector struct { - // Namespace of the metric. - Namespace string `json:"namespace"` - // Subsystem of the metric. - Subsystem string `json:"subsystem"` - // Collector type (histogram, gauge, counter, summary). - Type CollectorType `json:"type"` - // Help of collector. - Help string `json:"help"` - // Labels for vectorized metrics. - Labels []string `json:"labels"` - // Buckets for histogram metric. - Buckets []float64 `json:"buckets"` -} - -// Hydrate configuration. -func (c *Config) Hydrate(cfg service.Config) error { - return cfg.Unmarshal(c) -} - -// register application specific metrics. -func (c *Config) getCollectors() (map[string]prometheus.Collector, error) { - if c.Collect == nil { - return nil, nil - } - - collectors := make(map[string]prometheus.Collector) - - for name, m := range c.Collect { - var collector prometheus.Collector - switch m.Type { - case Histogram: - opts := prometheus.HistogramOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - Buckets: m.Buckets, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewHistogramVec(opts, m.Labels) - } else { - collector = prometheus.NewHistogram(opts) - } - case Gauge: - opts := prometheus.GaugeOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewGaugeVec(opts, m.Labels) - } else { - collector = prometheus.NewGauge(opts) - } - case Counter: - opts := prometheus.CounterOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewCounterVec(opts, m.Labels) - } else { - collector = prometheus.NewCounter(opts) - } - case Summary: - opts := prometheus.SummaryOpts{ - Name: name, - Namespace: m.Namespace, - Subsystem: m.Subsystem, - Help: m.Help, - } - - if len(m.Labels) != 0 { - collector = prometheus.NewSummaryVec(opts, m.Labels) - } else { - collector = prometheus.NewSummary(opts) - } - default: - return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) - } - - collectors[name] = collector - } - - return collectors, nil -} diff --git a/service/metrics/config_test.go b/service/metrics/config_test.go deleted file mode 100644 index a64e9047..00000000 --- a/service/metrics/config_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package metrics - -import ( - json "github.com/json-iterator/go" - "github.com/prometheus/client_golang/prometheus" - "github.com/spiral/roadrunner/service" - "github.com/stretchr/testify/assert" - "testing" -) - -type mockCfg struct{ cfg string } - -func (cfg *mockCfg) Get(name string) service.Config { return nil } -func (cfg *mockCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - return j.Unmarshal([]byte(cfg.cfg), out) -} - -func Test_Config_Hydrate_Error1(t *testing.T) { - cfg := &mockCfg{`{"request": {"From": "Something"}}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) -} - -func Test_Config_Hydrate_Error2(t *testing.T) { - cfg := &mockCfg{`{"dir": "/dir/"`} - c := &Config{} - - assert.Error(t, c.Hydrate(cfg)) -} - -func Test_Config_Metrics(t *testing.T) { - cfg := &mockCfg{`{ -"collect":{ - "metric1":{"type": "gauge"}, - "metric2":{ "type": "counter"}, - "metric3":{"type": "summary"}, - "metric4":{"type": "histogram"} -} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - m, err := c.getCollectors() - assert.NoError(t, err) - - assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"]) - assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"]) - assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"]) - assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"]) -} - -func Test_Config_MetricsVector(t *testing.T) { - cfg := &mockCfg{`{ -"collect":{ - "metric1":{"type": "gauge","labels":["label"]}, - "metric2":{ "type": "counter","labels":["label"]}, - "metric3":{"type": "summary","labels":["label"]}, - "metric4":{"type": "histogram","labels":["label"]} -} -}`} - c := &Config{} - - assert.NoError(t, c.Hydrate(cfg)) - - m, err := c.getCollectors() - assert.NoError(t, err) - - assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"]) - assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"]) - assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"]) - assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"]) -} diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go deleted file mode 100644 index 377d6173..00000000 --- a/service/metrics/rpc.go +++ /dev/null @@ -1,260 +0,0 @@ -package metrics - -import ( - "fmt" - "github.com/prometheus/client_golang/prometheus" -) - -type rpcServer struct { - svc *Service -} - -// Metric represent single metric produced by the application. -type Metric struct { - // Collector name. - Name string - - // Collector value. - Value float64 - - // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values. - Labels []string -} - -// Add new metric to the designated collector. -func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) - if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Add(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Add(m.Value) - - case prometheus.Counter: - c.Add(m.Value) - - case *prometheus.CounterVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Add(m.Value) - - default: - return fmt.Errorf("collector `%s` does not support method `Add`", m.Name) - } - - // RPC, set ok to true as return value. Need by rpc.Call reply argument - *ok = true - return nil -} - -// Sub subtract the value from the specific metric (gauge only). -func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) - if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Sub(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Sub(m.Value) - default: - return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) - } - - // RPC, set ok to true as return value. Need by rpc.Call reply argument - *ok = true - return nil -} - -// Observe the value (histogram and summary only). -func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) - if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) - } - - switch c := c.(type) { - case *prometheus.SummaryVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Observe(m.Value) - - case prometheus.Histogram: - c.Observe(m.Value) - - case *prometheus.HistogramVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Observe(m.Value) - default: - return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) - } - - // RPC, set ok to true as return value. Need by rpc.Call reply argument - *ok = true - return nil -} -// Declare is used to register new collector in prometheus -// THE TYPES ARE: -// NamedCollector -> Collector with the name -// bool -> RPC reply value -// RETURNS: -// error -func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) { - // MustRegister could panic, so, to return error and not shutdown whole app - // we recover and return error - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - if rpc.svc.Collector(c.Name) != nil { - *ok = false - // alternative is to return error - // fmt.Errorf("tried to register existing collector with the name `%s`", c.Name) - return nil - } - - var collector prometheus.Collector - switch c.Type { - case Histogram: - opts := prometheus.HistogramOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, - Buckets: c.Buckets, - } - - if len(c.Labels) != 0 { - collector = prometheus.NewHistogramVec(opts, c.Labels) - } else { - collector = prometheus.NewHistogram(opts) - } - case Gauge: - opts := prometheus.GaugeOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, - } - - if len(c.Labels) != 0 { - collector = prometheus.NewGaugeVec(opts, c.Labels) - } else { - collector = prometheus.NewGauge(opts) - } - case Counter: - opts := prometheus.CounterOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, - } - - if len(c.Labels) != 0 { - collector = prometheus.NewCounterVec(opts, c.Labels) - } else { - collector = prometheus.NewCounter(opts) - } - case Summary: - opts := prometheus.SummaryOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, - } - - if len(c.Labels) != 0 { - collector = prometheus.NewSummaryVec(opts, c.Labels) - } else { - collector = prometheus.NewSummary(opts) - } - - default: - return fmt.Errorf("unknown collector type `%s`", c.Type) - - } - - // add collector to sync.Map - rpc.svc.collectors.Store(c.Name, collector) - // that method might panic, we handle it by recover - rpc.svc.MustRegister(collector) - - *ok = true - return nil -} - -// Set the metric value (only for gaude). -func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) - if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) - } - - switch c := c.(type) { - case prometheus.Gauge: - c.Set(m.Value) - - case *prometheus.GaugeVec: - if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) - } - - c.WithLabelValues(m.Labels...).Set(m.Value) - - default: - return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) - } - - // RPC, set ok to true as return value. Need by rpc.Call reply argument - *ok = true - return nil -} diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go deleted file mode 100644 index 2fc4bc32..00000000 --- a/service/metrics/rpc_test.go +++ /dev/null @@ -1,861 +0,0 @@ -package metrics - -import ( - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" - rpc2 "net/rpc" - "strconv" - "testing" - "time" -) - -var port = 5004 - -func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.Container) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(rpc.ID, &rpc.Service{}) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{ - rpcCfg: `{"enable":true, "listen":"tcp://:` + strconv.Itoa(port) + `"}`, - metricsCfg: `{ - "address": "localhost:` + portNum + `", - "collect":{ - ` + metric + ` - } - }`})) - - // rotate ports for travis - port++ - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - s2, _ := c.Get(rpc.ID) - rs := s2.(*rpc.Service) - - assert.True(t, s.(*Service).Enabled()) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 200) - - client, err := rs.Client() - assert.NoError(t, err) - if err != nil { - panic(err) - } - - return client, c -} - -func Test_Set_RPC(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge" - }`, - "2112", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2112/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge 100`) -} - -func Test_Set_RPC_Vector(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2113", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2113/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) -} - -func Test_Set_RPC_CollectorError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2114", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge_2", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Set_RPC_MetricError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2115", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Set_RPC_MetricError_2(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2116", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) -} - -func Test_Set_RPC_MetricError_3(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2117", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Set", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) -} - -// sub - -func Test_Sub_RPC(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge" - }`, - "2118", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) - assert.True(t, ok) - - assert.NoError(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge", - Value: 10.0, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2118/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge 90`) -} - -func Test_Sub_RPC_Vector(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2119", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - assert.NoError(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge", - Value: 10.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2119/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge{section="first",type="core"} 90`) -} - -func Test_Register_RPC_Histogram(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2319", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ - Name: "custom_histogram", - Collector: Collector{ - Namespace: "test_histogram", - Subsystem: "test_histogram", - Type: Histogram, - Help: "test_histogram", - Labels: nil, - Buckets: []float64{0.1, 0.2, 0.5}, - }, - }, &ok)) - assert.True(t, ok) - - var ok2 bool - // histogram does not support Add, should be an error - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "custom_histogram", - }, &ok2)) - // ok should became false - assert.False(t, ok2) - - out, _, err := get("http://localhost:2319/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `TYPE test_histogram_test_histogram_custom_histogram histogram`) - - // check buckets - assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.1"} 0`) - assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.2"} 0`) - assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.5"} 0`) -} - -func Test_Register_RPC_Gauge(t *testing.T) { - // FOR register method, setup used just to init the rpc - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2324", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ - Name: "custom_gauge", - Collector: Collector{ - Namespace: "test_gauge", - Subsystem: "test_gauge", - Type: Gauge, - Help: "test_gauge", - Labels: []string{"type", "section"}, - Buckets: nil, - }, - }, &ok)) - assert.True(t, ok) - - var ok2 bool - // Add to custom_gauge - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "custom_gauge", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok2)) - // ok should became true - assert.True(t, ok2) - - // Subtract from custom runtime metric - var ok3 bool - assert.NoError(t, client.Call("metrics.Sub", Metric{ - Name: "custom_gauge", - Value: 10.0, - Labels: []string{"core", "first"}, - }, &ok3)) - assert.True(t, ok3) - - out, _, err := get("http://localhost:2324/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `test_gauge_test_gauge_custom_gauge{section="first",type="core"} 90`) -} - -func Test_Register_RPC_Counter(t *testing.T) { - // FOR register method, setup used just to init the rpc - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2328", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ - Name: "custom_counter", - Collector: Collector{ - Namespace: "test_counter", - Subsystem: "test_counter", - Type: Counter, - Help: "test_counter", - Labels: []string{"type", "section"}, - Buckets: nil, - }, - }, &ok)) - assert.True(t, ok) - - var ok2 bool - // Add to custom_counter - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "custom_counter", - Value: 100.0, - Labels: []string{"type2", "section2"}, - }, &ok2)) - // ok should became true - assert.True(t, ok2) - - out, _, err := get("http://localhost:2328/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `test_counter_test_counter_custom_counter{section="section2",type="type2"} 100`) -} - -func Test_Register_RPC_Summary(t *testing.T) { - // FOR register method, setup used just to init the rpc - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "6666", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ - Name: "custom_summary", - Collector: Collector{ - Namespace: "test_summary", - Subsystem: "test_summary", - Type: Summary, - Help: "test_summary", - Labels: nil, - Buckets: nil, - }, - }, &ok)) - assert.True(t, ok) - - var ok2 bool - // Add to custom_summary is not supported - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "custom_summary", - Value: 100.0, - Labels: []string{"type22", "section22"}, - }, &ok2)) - // ok should became false - assert.False(t, ok2) - - out, _, err := get("http://localhost:6666/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `test_summary_test_summary_custom_summary_sum 0`) - assert.Contains(t, out, `test_summary_test_summary_custom_summary_count 0`) -} - -func Test_Sub_RPC_CollectorError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2120", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge_2", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Sub_RPC_MetricError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2121", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Sub_RPC_MetricError_2(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "gauge", - "labels": ["type", "section"] - }`, - "2122", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) -} - -func Test_Sub_RPC_MetricError_3(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2123", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Sub", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) -} - -// -- observe - -func Test_Observe_RPC(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "histogram" - }`, - "2124", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2124/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_histogram`) -} - -func Test_Observe_RPC_Vector(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2125", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2125/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_histogram`) -} - -func Test_Observe_RPC_CollectorError(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2126", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Observe_RPC_MetricError(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2127", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Observe_RPC_MetricError_2(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2128", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - }, &ok)) -} - -// -- observe summary - -func Test_Observe2_RPC(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary" - }`, - "2129", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2129/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_histogram`) -} - -func Test_Observe2_RPC_Invalid(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary" - }`, - "2130", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram_2", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Observe2_RPC_Invalid_2(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "gauge" - }`, - "2131", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - }, &ok)) -} - -func Test_Observe2_RPC_Vector(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary", - "labels": ["type", "section"] - }`, - "2132", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2132/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_histogram`) -} - -func Test_Observe2_RPC_CollectorError(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary", - "labels": ["type", "section"] - }`, - "2133", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Observe2_RPC_MetricError(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary", - "labels": ["type", "section"] - }`, - "2134", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) -} - -func Test_Observe2_RPC_MetricError_2(t *testing.T) { - client, c := setup( - t, - `"user_histogram":{ - "type": "summary", - "labels": ["type", "section"] - }`, - "2135", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Observe", Metric{ - Name: "user_histogram", - Value: 100.0, - }, &ok)) -} - -// add -func Test_Add_RPC(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "counter" - }`, - "2136", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2136/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge 100`) -} - -func Test_Add_RPC_Vector(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "counter", - "labels": ["type", "section"] - }`, - "2137", - ) - defer c.Stop() - - var ok bool - assert.NoError(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"core", "first"}, - }, &ok)) - assert.True(t, ok) - - out, _, err := get("http://localhost:2137/metrics") - assert.NoError(t, err) - assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) -} - -func Test_Add_RPC_CollectorError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "counter", - "labels": ["type", "section"] - }`, - "2138", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge_2", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) - - assert.False(t, ok) -} - -func Test_Add_RPC_MetricError(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "counter", - "labels": ["type", "section"] - }`, - "2139", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - Labels: []string{"missing"}, - }, &ok)) - - assert.False(t, ok) -} - -func Test_Add_RPC_MetricError_2(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "counter", - "labels": ["type", "section"] - }`, - "2140", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) - - assert.False(t, ok) -} - -func Test_Add_RPC_MetricError_3(t *testing.T) { - client, c := setup( - t, - `"user_gauge":{ - "type": "histogram", - "labels": ["type", "section"] - }`, - "2141", - ) - defer c.Stop() - - var ok bool - assert.Error(t, client.Call("metrics.Add", Metric{ - Name: "user_gauge", - Value: 100.0, - }, &ok)) -} diff --git a/service/metrics/service.go b/service/metrics/service.go deleted file mode 100644 index 4656ae04..00000000 --- a/service/metrics/service.go +++ /dev/null @@ -1,191 +0,0 @@ -package metrics - -// todo: declare metric at runtime - -import ( - "context" - "crypto/tls" - "fmt" - "net/http" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" - "github.com/spiral/roadrunner/service/rpc" - "golang.org/x/sys/cpu" -) - -const ( - // ID declares public service name. - ID = "metrics" - // maxHeaderSize declares max header size for prometheus server - maxHeaderSize = 1024 * 1024 * 100 // 104MB -) - -// Service to manage application metrics using Prometheus. -type Service struct { - cfg *Config - log *logrus.Logger - mu sync.Mutex - http *http.Server - collectors sync.Map - registry *prometheus.Registry -} - -// Init service. -func (s *Service) Init(cfg *Config, r *rpc.Service, log *logrus.Logger) (bool, error) { - s.cfg = cfg - s.log = log - s.registry = prometheus.NewRegistry() - - s.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - s.registry.MustRegister(prometheus.NewGoCollector()) - - if r != nil { - if err := r.Register(ID, &rpcServer{s}); err != nil { - return false, err - } - } - - return true, nil -} - -// Enabled indicates that server is able to collect metrics. -func (s *Service) Enabled() bool { - return s.cfg != nil -} - -// Register new prometheus collector. -func (s *Service) Register(c prometheus.Collector) error { - return s.registry.Register(c) -} - -// MustRegister registers new collector or fails with panic. -func (s *Service) MustRegister(c prometheus.Collector) { - s.registry.MustRegister(c) -} - -// Serve prometheus metrics service. -func (s *Service) Serve() error { - // register application specific metrics - collectors, err := s.cfg.getCollectors() - if err != nil { - return err - } - - for name, collector := range collectors { - if err := s.registry.Register(collector); err != nil { - return err - } - - s.collectors.Store(name, collector) - } - - s.mu.Lock() - - var topCipherSuites []uint16 - var defaultCipherSuitesTLS13 []uint16 - - hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ - hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL - // Keep in sync with crypto/aes/cipher_s390x.go. - hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) - - hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X - - if hasGCMAsm { - // If AES-GCM hardware is provided then prioritise AES-GCM - // cipher suites. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } else { - // Without AES-GCM hardware, we put the ChaCha20-Poly1305 - // cipher suites first. - topCipherSuites = []uint16{ - tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, - } - defaultCipherSuitesTLS13 = []uint16{ - tls.TLS_CHACHA20_POLY1305_SHA256, - tls.TLS_AES_128_GCM_SHA256, - tls.TLS_AES_256_GCM_SHA384, - } - } - - DefaultCipherSuites := make([]uint16, 0, 22) - DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) - DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - - s.http = &http.Server{ - Addr: s.cfg.Address, - Handler: promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{}), - IdleTimeout: time.Hour * 24, - ReadTimeout: time.Minute * 60, - MaxHeaderBytes: maxHeaderSize, - ReadHeaderTimeout: time.Minute * 60, - WriteTimeout: time.Minute * 60, - TLSConfig: &tls.Config{ - CurvePreferences: []tls.CurveID{ - tls.CurveP256, - tls.CurveP384, - tls.CurveP521, - tls.X25519, - }, - CipherSuites: DefaultCipherSuites, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - }, - } - s.mu.Unlock() - - err = s.http.ListenAndServe() - if err != nil && err != http.ErrServerClosed { - return err - } - - return nil -} - -// Stop prometheus metrics service. -func (s *Service) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - - if s.http != nil { - // gracefully stop server - go func() { - err := s.http.Shutdown(context.Background()) - if err != nil { - // Function should be Stop() error - s.log.Error(fmt.Errorf("error shutting down the metrics server: error %v", err)) - } - }() - } -} - -// Collector returns application specific collector by name or nil if collector not found. -func (s *Service) Collector(name string) prometheus.Collector { - collector, ok := s.collectors.Load(name) - if !ok { - return nil - } - - return collector.(prometheus.Collector) -} diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go deleted file mode 100644 index cdb81147..00000000 --- a/service/metrics/service_test.go +++ /dev/null @@ -1,247 +0,0 @@ -package metrics - -import ( - json "github.com/json-iterator/go" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "github.com/sirupsen/logrus/hooks/test" - "github.com/spiral/roadrunner/service" - "github.com/spiral/roadrunner/service/rpc" - "github.com/stretchr/testify/assert" - "io/ioutil" - "net/http" - "testing" - "time" -) - -type testCfg struct { - rpcCfg string - metricsCfg string - target string -} - -func (cfg *testCfg) Get(name string) service.Config { - if name == ID { - return &testCfg{target: cfg.metricsCfg} - } - - if name == rpc.ID { - return &testCfg{target: cfg.rpcCfg} - } - - return nil -} - -func (cfg *testCfg) Unmarshal(out interface{}) error { - j := json.ConfigCompatibleWithStandardLibrary - err := j.Unmarshal([]byte(cfg.target), out) - return err -} - -// get request and return body -func get(url string) (string, *http.Response, error) { - r, err := http.Get(url) - if err != nil { - return "", nil, err - } - - b, err := ioutil.ReadAll(r.Body) - if err != nil { - return "", nil, err - } - - err = r.Body.Close() - if err != nil { - return "", nil, err - } - return string(b), r, err -} - -func TestService_Serve(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2116" - }`})) - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - - out, _, err := get("http://localhost:2116/metrics") - assert.NoError(t, err) - - assert.Contains(t, out, "go_gc_duration_seconds") -} - -func Test_ServiceCustomMetric(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2115" - }`})) - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - collector := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "my_gauge", - Help: "My gauge value", - }) - - assert.NoError(t, s.(*Service).Register(collector)) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - - collector.Set(100) - - out, _, err := get("http://localhost:2115/metrics") - assert.NoError(t, err) - - assert.Contains(t, out, "my_gauge 100") -} - -func Test_ServiceCustomMetricMust(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2114" - }`})) - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - collector := prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "my_gauge_2", - Help: "My gauge value", - }) - - s.(*Service).MustRegister(collector) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - - collector.Set(100) - - out, _, err := get("http://localhost:2114/metrics") - assert.NoError(t, err) - - assert.Contains(t, out, "my_gauge_2 100") -} - -func Test_ConfiguredMetric(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2113", - "collect":{ - "user_gauge":{ - "type": "gauge" - } - } - }`})) - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - assert.True(t, s.(*Service).Enabled()) - - go func() { - err := c.Serve() - if err != nil { - t.Errorf("error during the Serve: error %v", err) - } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - - s.(*Service).Collector("user_gauge").(prometheus.Gauge).Set(100) - - assert.Nil(t, s.(*Service).Collector("invalid")) - - out, _, err := get("http://localhost:2113/metrics") - assert.NoError(t, err) - - assert.Contains(t, out, "user_gauge 100") -} - -func Test_ConfiguredDuplicateMetric(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2112", - "collect":{ - "go_gc_duration_seconds":{ - "type": "gauge" - } - } - }`})) - - s, _ := c.Get(ID) - assert.NotNil(t, s) - - assert.True(t, s.(*Service).Enabled()) - - assert.Error(t, c.Serve()) -} - -func Test_ConfiguredInvalidMetric(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) - - c := service.NewContainer(logger) - c.Register(ID, &Service{}) - - assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ - "address": "localhost:2112", - "collect":{ - "user_gauge":{ - "type": "invalid" - } - } - - }`})) - - assert.Error(t, c.Serve()) -} |