summaryrefslogtreecommitdiff
path: root/service
diff options
context:
space:
mode:
Diffstat (limited to 'service')
-rw-r--r--service/metrics/config.go119
-rw-r--r--service/metrics/config_test.go27
-rw-r--r--service/metrics/rpc.go156
-rw-r--r--service/metrics/rpc_test.go1
-rw-r--r--service/metrics/service.go91
-rw-r--r--service/metrics/service_test.go190
6 files changed, 584 insertions, 0 deletions
diff --git a/service/metrics/config.go b/service/metrics/config.go
new file mode 100644
index 00000000..b9b21ea9
--- /dev/null
+++ b/service/metrics/config.go
@@ -0,0 +1,119 @@
+package metrics
+
+import (
+ "fmt"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/service"
+)
+
+type Config struct {
+ // Address to listen
+ Address string
+
+ // Collect define application specific metrics.
+ Collect map[string]Collector
+}
+
+// Collector describes single application specific metric.
+type Collector struct {
+ // Namespace of the metric.
+ Namespace string
+
+ // Subsystem of the metric.
+ Subsystem string
+
+ // Collector type (histogram, gauge, counter, summary).
+ Type string
+
+ // Help of collector.
+ Help string
+
+ // Labels for vectorized metrics.
+ Labels []string
+
+ // Buckets for histogram metric.
+ Buckets []float64
+}
+
+// Hydrate configuration.
+func (c *Config) Hydrate(cfg service.Config) error {
+ return cfg.Unmarshal(c)
+}
+
+// register application specific metrics.
+func (c *Config) initCollectors() (map[string]prometheus.Collector, error) {
+ if c.Collect == nil {
+ return nil, nil
+ }
+
+ collectors := make(map[string]prometheus.Collector)
+
+ for name, m := range c.Collect {
+ var collector prometheus.Collector
+ switch m.Type {
+ case "histogram":
+ opts := prometheus.HistogramOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Buckets: m.Buckets,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case "gauge":
+ opts := prometheus.GaugeOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case "counter":
+ opts := prometheus.CounterOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case "summary":
+ opts := prometheus.SummaryOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+ default:
+ return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name)
+ }
+
+ if err := prometheus.Register(collector); err != nil {
+ return nil, err
+ }
+
+ collectors[name] = collector
+ }
+
+ return collectors, nil
+}
diff --git a/service/metrics/config_test.go b/service/metrics/config_test.go
new file mode 100644
index 00000000..bd02d1cf
--- /dev/null
+++ b/service/metrics/config_test.go
@@ -0,0 +1,27 @@
+package metrics
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := &mockCfg{`{"request": {"From": "Something"}}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go
new file mode 100644
index 00000000..30ad6c62
--- /dev/null
+++ b/service/metrics/rpc.go
@@ -0,0 +1,156 @@
+package metrics
+
+import (
+ "fmt"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type rpcServer struct{ svc *Service }
+
+// Metric represent single metric produced by the application.
+type Metric struct {
+ // Collector name.
+ Name string
+
+ // Collector value.
+ Value float64
+
+ // Labels associated with metric. Only for vector metrics.
+ Labels []string
+}
+
+// Add new metric to the designated collector.
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Add(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value)
+
+ case prometheus.Counter:
+ c.(prometheus.Counter).Add(m.Value)
+
+ case *prometheus.CounterVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value)
+
+ case prometheus.Summary:
+ c.(prometheus.Counter).Add(m.Value)
+
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.(prometheus.Histogram).Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Sub subtract the value from the specific metric (gauge only).
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Sub(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value)
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Observe the value (histogram and summary only).
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.(prometheus.Histogram).Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Set the metric value (only for gaude).
+func (rpc *rpcServer) Set(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Set(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value)
+
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Set`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go
new file mode 100644
index 00000000..1abe097a
--- /dev/null
+++ b/service/metrics/rpc_test.go
@@ -0,0 +1 @@
+package metrics
diff --git a/service/metrics/service.go b/service/metrics/service.go
new file mode 100644
index 00000000..2c94568d
--- /dev/null
+++ b/service/metrics/service.go
@@ -0,0 +1,91 @@
+package metrics
+
+import (
+ "context"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/spiral/roadrunner/service/rpc"
+ "net/http"
+ "sync"
+)
+
+// ID declares public service name.
+const ID = "metrics"
+
+// Service to manage application metrics using Prometheus.
+type Service struct {
+ cfg *Config
+ mu sync.Mutex
+ http *http.Server
+ collectors sync.Map
+}
+
+// Init service.
+func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) {
+ s.cfg = cfg
+
+ if r != nil {
+ if err := r.Register(ID, &rpcServer{s}); err != nil {
+ return false, err
+ }
+ }
+
+ return true, nil
+}
+
+// Enabled indicates that server is able to collect metrics.
+func (s *Service) Enabled() bool {
+ return s.cfg != nil
+}
+
+// Register new prometheus collector.
+func (s *Service) Register(c prometheus.Collector) error {
+ return prometheus.Register(c)
+}
+
+// MustRegister registers new collector or fails with panic.
+func (s *Service) MustRegister(c prometheus.Collector) {
+ if err := prometheus.Register(c); err != nil {
+ panic(err)
+ }
+}
+
+// Serve prometheus metrics service.
+func (s *Service) Serve() error {
+ // register application specific metrics
+ collectors, err := s.cfg.initCollectors()
+ if err != nil {
+ return err
+ }
+
+ for name, collector := range collectors {
+ s.collectors.Store(name, collector)
+ }
+
+ s.mu.Lock()
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: promhttp.Handler()}
+ s.mu.Unlock()
+
+ return s.http.ListenAndServe()
+}
+
+// Stop prometheus metrics service.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.http != nil {
+ // gracefully stop server
+ go s.http.Shutdown(context.Background())
+ }
+}
+
+// Collector returns application specific collector by name or nil if collector not found.
+func (s *Service) Collector(name string) prometheus.Collector {
+ collector, ok := s.collectors.Load(name)
+ if !ok {
+ return nil
+ }
+
+ return collector.(prometheus.Collector)
+}
diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go
new file mode 100644
index 00000000..513b3042
--- /dev/null
+++ b/service/metrics/service_test.go
@@ -0,0 +1,190 @@
+package metrics
+
+import (
+ "encoding/json"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "net/http"
+ "testing"
+ "time"
+)
+
+type testCfg struct {
+ metricsCfg string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.metricsCfg}
+ }
+
+ return nil
+}
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ err := json.Unmarshal([]byte(cfg.target), out)
+ return err
+}
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ return string(b), r, err
+}
+
+func TestService_Serve(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "go_gc_duration_seconds")
+}
+
+func Test_ServiceCustomMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge",
+ Help: "My gauge value",
+ })
+
+ assert.NoError(t, s.(*Service).Register(collector))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ collector.Set(100)
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "my_gauge 100")
+}
+
+func Test_ServiceCustomMetricMust(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge_2",
+ Help: "My gauge value",
+ })
+
+ s.(*Service).MustRegister(collector)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ collector.Set(100)
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "my_gauge_2 100")
+}
+
+func Test_ConfiguredMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112",
+ "collect":{
+ "user_gauge":{
+ "type": "gauge"
+ }
+ }
+
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ assert.True(t, s.(*Service).Enabled())
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ s.(*Service).Collector("user_gauge").(prometheus.Gauge).Set(100)
+
+ assert.Nil(t, s.(*Service).Collector("invalid"))
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "user_gauge 100")
+}
+
+func Test_ConfiguredInvalidMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112",
+ "collect":{
+ "user_gauge":{
+ "type": "invalid"
+ }
+ }
+
+ }`}))
+
+ assert.Error(t, c.Serve())
+}