diff options
author | Anton Titov <[email protected]> | 2019-06-27 13:00:34 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2019-06-27 13:00:34 +0300 |
commit | ce7f331a5e6c6129331e06224865abdb1298584d (patch) | |
tree | 4fdf5b9158cf481ee77c408ea8d57d83a7e692f3 /service | |
parent | 5b6e0a535fef745594a4966c724509dcda05b422 (diff) | |
parent | 1633d128309765536e7cbb176225926efda7a33c (diff) |
Merge pull request #168 from spiral/feature/metrics
Feature/metrics
Diffstat (limited to 'service')
-rw-r--r-- | service/metrics/config.go | 119 | ||||
-rw-r--r-- | service/metrics/config_test.go | 27 | ||||
-rw-r--r-- | service/metrics/rpc.go | 156 | ||||
-rw-r--r-- | service/metrics/rpc_test.go | 1 | ||||
-rw-r--r-- | service/metrics/service.go | 91 | ||||
-rw-r--r-- | service/metrics/service_test.go | 190 |
6 files changed, 584 insertions, 0 deletions
diff --git a/service/metrics/config.go b/service/metrics/config.go new file mode 100644 index 00000000..b9b21ea9 --- /dev/null +++ b/service/metrics/config.go @@ -0,0 +1,119 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/service" +) + +type Config struct { + // Address to listen + Address string + + // Collect define application specific metrics. + Collect map[string]Collector +} + +// Collector describes single application specific metric. +type Collector struct { + // Namespace of the metric. + Namespace string + + // Subsystem of the metric. + Subsystem string + + // Collector type (histogram, gauge, counter, summary). + Type string + + // Help of collector. + Help string + + // Labels for vectorized metrics. + Labels []string + + // Buckets for histogram metric. + Buckets []float64 +} + +// Hydrate configuration. +func (c *Config) Hydrate(cfg service.Config) error { + return cfg.Unmarshal(c) +} + +// register application specific metrics. +func (c *Config) initCollectors() (map[string]prometheus.Collector, error) { + if c.Collect == nil { + return nil, nil + } + + collectors := make(map[string]prometheus.Collector) + + for name, m := range c.Collect { + var collector prometheus.Collector + switch m.Type { + case "histogram": + opts := prometheus.HistogramOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + Buckets: m.Buckets, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, m.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case "gauge": + opts := prometheus.GaugeOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, m.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case "counter": + opts := prometheus.CounterOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, m.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case "summary": + opts := prometheus.SummaryOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, m.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + default: + return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) + } + + if err := prometheus.Register(collector); err != nil { + return nil, err + } + + collectors[name] = collector + } + + return collectors, nil +} diff --git a/service/metrics/config_test.go b/service/metrics/config_test.go new file mode 100644 index 00000000..bd02d1cf --- /dev/null +++ b/service/metrics/config_test.go @@ -0,0 +1,27 @@ +package metrics + +import ( + "encoding/json" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"request": {"From": "Something"}}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go new file mode 100644 index 00000000..30ad6c62 --- /dev/null +++ b/service/metrics/rpc.go @@ -0,0 +1,156 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" +) + +type rpcServer struct{ svc *Service } + +// Metric represent single metric produced by the application. +type Metric struct { + // Collector name. + Name string + + // Collector value. + Value float64 + + // Labels associated with metric. Only for vector metrics. + Labels []string +} + +// Add new metric to the designated collector. +func (rpc *rpcServer) Add(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Add(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value) + + case prometheus.Counter: + c.(prometheus.Counter).Add(m.Value) + + case *prometheus.CounterVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value) + + case prometheus.Summary: + c.(prometheus.Counter).Add(m.Value) + + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value) + + case prometheus.Histogram: + c.(prometheus.Histogram).Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value) + } + + *ok = true + return nil +} + +// Sub subtract the value from the specific metric (gauge only). +func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Sub(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) + } + + *ok = true + return nil +} + +// Observe the value (histogram and summary only). +func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value) + + case prometheus.Histogram: + c.(prometheus.Histogram).Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) + } + + *ok = true + return nil +} + +// Set the metric value (only for gaude). +func (rpc *rpcServer) Set(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Set(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value) + + default: + return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) + } + + *ok = true + return nil +} diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go new file mode 100644 index 00000000..1abe097a --- /dev/null +++ b/service/metrics/rpc_test.go @@ -0,0 +1 @@ +package metrics diff --git a/service/metrics/service.go b/service/metrics/service.go new file mode 100644 index 00000000..2c94568d --- /dev/null +++ b/service/metrics/service.go @@ -0,0 +1,91 @@ +package metrics + +import ( + "context" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spiral/roadrunner/service/rpc" + "net/http" + "sync" +) + +// ID declares public service name. +const ID = "metrics" + +// Service to manage application metrics using Prometheus. +type Service struct { + cfg *Config + mu sync.Mutex + http *http.Server + collectors sync.Map +} + +// Init service. +func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { + s.cfg = cfg + + if r != nil { + if err := r.Register(ID, &rpcServer{s}); err != nil { + return false, err + } + } + + return true, nil +} + +// Enabled indicates that server is able to collect metrics. +func (s *Service) Enabled() bool { + return s.cfg != nil +} + +// Register new prometheus collector. +func (s *Service) Register(c prometheus.Collector) error { + return prometheus.Register(c) +} + +// MustRegister registers new collector or fails with panic. +func (s *Service) MustRegister(c prometheus.Collector) { + if err := prometheus.Register(c); err != nil { + panic(err) + } +} + +// Serve prometheus metrics service. +func (s *Service) Serve() error { + // register application specific metrics + collectors, err := s.cfg.initCollectors() + if err != nil { + return err + } + + for name, collector := range collectors { + s.collectors.Store(name, collector) + } + + s.mu.Lock() + s.http = &http.Server{Addr: s.cfg.Address, Handler: promhttp.Handler()} + s.mu.Unlock() + + return s.http.ListenAndServe() +} + +// Stop prometheus metrics service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.http != nil { + // gracefully stop server + go s.http.Shutdown(context.Background()) + } +} + +// Collector returns application specific collector by name or nil if collector not found. +func (s *Service) Collector(name string) prometheus.Collector { + collector, ok := s.collectors.Load(name) + if !ok { + return nil + } + + return collector.(prometheus.Collector) +} diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go new file mode 100644 index 00000000..513b3042 --- /dev/null +++ b/service/metrics/service_test.go @@ -0,0 +1,190 @@ +package metrics + +import ( + "encoding/json" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "testing" + "time" +) + +type testCfg struct { + metricsCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.metricsCfg} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + err := json.Unmarshal([]byte(cfg.target), out) + return err +} + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} + +func TestService_Serve(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "go_gc_duration_seconds") +} + +func Test_ServiceCustomMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge", + Help: "My gauge value", + }) + + assert.NoError(t, s.(*Service).Register(collector)) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge 100") +} + +func Test_ServiceCustomMetricMust(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge_2", + Help: "My gauge value", + }) + + s.(*Service).MustRegister(collector) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge_2 100") +} + +func Test_ConfiguredMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "user_gauge":{ + "type": "gauge" + } + } + + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + assert.True(t, s.(*Service).Enabled()) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + s.(*Service).Collector("user_gauge").(prometheus.Gauge).Set(100) + + assert.Nil(t, s.(*Service).Collector("invalid")) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "user_gauge 100") +} + +func Test_ConfiguredInvalidMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "user_gauge":{ + "type": "invalid" + } + } + + }`})) + + assert.Error(t, c.Serve()) +} |