diff options
-rw-r--r-- | metrics/interface.go | 9 | ||||
-rw-r--r-- | plugins/metrics/config.go | 136 | ||||
-rw-r--r-- | plugins/metrics/config_test.go | 75 | ||||
-rw-r--r-- | plugins/metrics/plugin.go | 193 | ||||
-rw-r--r-- | plugins/metrics/plugin_test.go | 247 | ||||
-rw-r--r-- | plugins/metrics/rpc.go | 260 | ||||
-rw-r--r-- | plugins/metrics/rpc_test.go | 861 |
7 files changed, 1781 insertions, 0 deletions
diff --git a/metrics/interface.go b/metrics/interface.go new file mode 100644 index 00000000..8207fb51 --- /dev/null +++ b/metrics/interface.go @@ -0,0 +1,9 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type StatProvider interface { + MetricsCollector() prometheus.Collector +} diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go new file mode 100644 index 00000000..a7919654 --- /dev/null +++ b/plugins/metrics/config.go @@ -0,0 +1,136 @@ +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// Config configures metrics service. +type Config struct { + // Address to listen + Address string + + // Collect define application specific metrics. + Collect map[string]Collector +} + +type NamedCollector struct { + // Name of the collector + Name string `json:"name"` + + // Collector structure + Collector `json:"collector"` +} + +// CollectorType represents prometheus collector types +type CollectorType string + +const ( + // Histogram type + Histogram CollectorType = "histogram" + + // Gauge type + Gauge CollectorType = "gauge" + + // Counter type + Counter CollectorType = "counter" + + // Summary type + Summary CollectorType = "summary" +) + +// Collector describes single application specific metric. +type Collector struct { + // Namespace of the metric. + Namespace string `json:"namespace"` + // Subsystem of the metric. + Subsystem string `json:"subsystem"` + // Collector type (histogram, gauge, counter, summary). + Type CollectorType `json:"type"` + // Help of collector. + Help string `json:"help"` + // Labels for vectorized metrics. + Labels []string `json:"labels"` + // Buckets for histogram metric. + Buckets []float64 `json:"buckets"` +} + +// Hydrate configuration. +//func (c *Config) Hydrate(cfg service.Config) error { +// return cfg.Unmarshal(c) +//} + +// register application specific metrics. +func (c *Config) getCollectors() (map[string]prometheus.Collector, error) { + if c.Collect == nil { + return nil, nil + } + + collectors := make(map[string]prometheus.Collector) + + for name, m := range c.Collect { + var collector prometheus.Collector + switch m.Type { + case Histogram: + opts := prometheus.HistogramOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + Buckets: m.Buckets, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, m.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case Gauge: + opts := prometheus.GaugeOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, m.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case Counter: + opts := prometheus.CounterOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, m.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case Summary: + opts := prometheus.SummaryOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, m.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + default: + return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) + } + + collectors[name] = collector + } + + return collectors, nil +} diff --git a/plugins/metrics/config_test.go b/plugins/metrics/config_test.go new file mode 100644 index 00000000..a64e9047 --- /dev/null +++ b/plugins/metrics/config_test.go @@ -0,0 +1,75 @@ +package metrics + +import ( + json "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + return j.Unmarshal([]byte(cfg.cfg), out) +} + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"request": {"From": "Something"}}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Metrics(t *testing.T) { + cfg := &mockCfg{`{ +"collect":{ + "metric1":{"type": "gauge"}, + "metric2":{ "type": "counter"}, + "metric3":{"type": "summary"}, + "metric4":{"type": "histogram"} +} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + m, err := c.getCollectors() + assert.NoError(t, err) + + assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"]) + assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"]) + assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"]) + assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"]) +} + +func Test_Config_MetricsVector(t *testing.T) { + cfg := &mockCfg{`{ +"collect":{ + "metric1":{"type": "gauge","labels":["label"]}, + "metric2":{ "type": "counter","labels":["label"]}, + "metric3":{"type": "summary","labels":["label"]}, + "metric4":{"type": "histogram","labels":["label"]} +} +}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + + m, err := c.getCollectors() + assert.NoError(t, err) + + assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"]) + assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"]) + assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"]) + assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"]) +} diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go new file mode 100644 index 00000000..b9b79d95 --- /dev/null +++ b/plugins/metrics/plugin.go @@ -0,0 +1,193 @@ +package metrics + +// todo: declare metric at runtime + +import ( + "context" + "crypto/tls" + "net/http" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/metrics" + "golang.org/x/sys/cpu" +) + +const ( + // ID declares public service name. + ID = "metrics" + // maxHeaderSize declares max header size for prometheus server + maxHeaderSize = 1024 * 1024 * 100 // 104MB +) + +// Plugin to manage application metrics using Prometheus. +type Plugin struct { + cfg Config + log log.Logger + mu sync.Mutex // all receivers are pointers + http *http.Server + collectors []prometheus.Collector //sync.Map // all receivers are pointers + registry *prometheus.Registry +} + +// Init service. +func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) { + m.cfg = cfg + m.log = log + m.registry = prometheus.NewRegistry() + + m.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + m.registry.MustRegister(prometheus.NewGoCollector()) + + //if r != nil { + // if err := r.Register(ID, &rpcServer{s}); err != nil { + // return false, err + // } + //} + + return true, nil +} + +// Enabled indicates that server is able to collect metrics. +//func (m *Plugin) Enabled() bool { +// return m.cfg != nil +//} +// +//// Register new prometheus collector. +//func (m *Plugin) Register(c prometheus.Collector) error { +// return m.registry.Register(c) +//} + +// MustRegister registers new collector or fails with panic. +func (m *Plugin) MustRegister(c prometheus.Collector) { + m.registry.MustRegister(c) +} + +// Serve prometheus metrics service. +func (m *Plugin) Serve() error { + // register application specific metrics + collectors, err := m.cfg.getCollectors() + if err != nil { + return err + } + + for name, collector := range collectors { + if err := m.registry.Register(collector); err != nil { + return err + } + + m.collectors.Store(name, collector) + } + + m.mu.Lock() + + var topCipherSuites []uint16 + var defaultCipherSuitesTLS13 []uint16 + + hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ + hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL + // Keep in sync with crypto/aes/cipher_s390x.go. + hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) + + hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X + + if hasGCMAsm { + // If AES-GCM hardware is provided then prioritise AES-GCM + // cipher suites. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } else { + // Without AES-GCM hardware, we put the ChaCha20-Poly1305 + // cipher suites first. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } + + DefaultCipherSuites := make([]uint16, 0, 22) + DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) + DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) + + m.http = &http.Server{ + Addr: m.cfg.Address, + Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + IdleTimeout: time.Hour * 24, + ReadTimeout: time.Minute * 60, + MaxHeaderBytes: maxHeaderSize, + ReadHeaderTimeout: time.Minute * 60, + WriteTimeout: time.Minute * 60, + TLSConfig: &tls.Config{ + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.CurveP384, + tls.CurveP521, + tls.X25519, + }, + CipherSuites: DefaultCipherSuites, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + }, + } + m.mu.Unlock() + + err = m.http.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + return err + } + + return nil +} + +// Stop prometheus metrics service. +func (m *Plugin) Stop() { + m.mu.Lock() + defer m.mu.Unlock() + + if m.http != nil { + // gracefully stop server + go func() { + err := m.http.Shutdown(context.Background()) + if err != nil { + // Function should be Stop() error + m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) + } + }() + } +} + +func (m *Plugin) Collects() []interface{} { + return []interface{}{ + m.Register, + } +} + +// Collector returns application specific collector by name or nil if collector not found. +func (m *Plugin) Register(stat metrics.StatProvider) error { + m.collectors = append(m.collectors, stat.MetricsCollector()) + return nil +} diff --git a/plugins/metrics/plugin_test.go b/plugins/metrics/plugin_test.go new file mode 100644 index 00000000..aa150504 --- /dev/null +++ b/plugins/metrics/plugin_test.go @@ -0,0 +1,247 @@ +package metrics + +import ( + json "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/spiral/roadrunner/service/rpc" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "testing" + "time" +) + +type testCfg struct { + rpcCfg string + metricsCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.metricsCfg} + } + + if name == rpc.ID { + return &testCfg{target: cfg.rpcCfg} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + j := json.ConfigCompatibleWithStandardLibrary + err := j.Unmarshal([]byte(cfg.target), out) + return err +} + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", nil, err + } + + err = r.Body.Close() + if err != nil { + return "", nil, err + } + return string(b), r, err +} + +func TestService_Serve(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2116" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + out, _, err := get("http://localhost:2116/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "go_gc_duration_seconds") +} + +func Test_ServiceCustomMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2115" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge", + Help: "My gauge value", + }) + + assert.NoError(t, s.(*Plugin).Register(collector)) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2115/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge 100") +} + +func Test_ServiceCustomMetricMust(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2114" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge_2", + Help: "My gauge value", + }) + + s.(*Plugin).MustRegister(collector) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2114/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge_2 100") +} + +func Test_ConfiguredMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2113", + "collect":{ + "user_gauge":{ + "type": "gauge" + } + } + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + assert.True(t, s.(*Plugin).Enabled()) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + s.(*Plugin).Collector("user_gauge").(prometheus.Gauge).Set(100) + + assert.Nil(t, s.(*Plugin).Collector("invalid")) + + out, _, err := get("http://localhost:2113/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "user_gauge 100") +} + +func Test_ConfiguredDuplicateMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "go_gc_duration_seconds":{ + "type": "gauge" + } + } + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + assert.True(t, s.(*Plugin).Enabled()) + + assert.Error(t, c.Serve()) +} + +func Test_ConfiguredInvalidMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Plugin{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "user_gauge":{ + "type": "invalid" + } + } + + }`})) + + assert.Error(t, c.Serve()) +} diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go new file mode 100644 index 00000000..2dd6d4ef --- /dev/null +++ b/plugins/metrics/rpc.go @@ -0,0 +1,260 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" +) + +type rpcServer struct { + svc *Plugin +} + +// Metric represent single metric produced by the application. +type Metric struct { + // Collector name. + Name string + + // Collector value. + Value float64 + + // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values. + Labels []string +} + +// Add new metric to the designated collector. +func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { + defer func() { + if r, fail := recover().(error); fail { + err = r + } + }() + + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Add(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Add(m.Value) + + case prometheus.Counter: + c.Add(m.Value) + + case *prometheus.CounterVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Add(m.Value) + + default: + return fmt.Errorf("collector `%s` does not support method `Add`", m.Name) + } + + // RPC, set ok to true as return value. Need by rpc.Call reply argument + *ok = true + return nil +} + +// Sub subtract the value from the specific metric (gauge only). +func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { + defer func() { + if r, fail := recover().(error); fail { + err = r + } + }() + + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Sub(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Sub(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) + } + + // RPC, set ok to true as return value. Need by rpc.Call reply argument + *ok = true + return nil +} + +// Observe the value (histogram and summary only). +func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) { + defer func() { + if r, fail := recover().(error); fail { + err = r + } + }() + + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c := c.(type) { + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Observe(m.Value) + + case prometheus.Histogram: + c.Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Observe(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) + } + + // RPC, set ok to true as return value. Need by rpc.Call reply argument + *ok = true + return nil +} +// Declare is used to register new collector in prometheus +// THE TYPES ARE: +// NamedCollector -> Collector with the name +// bool -> RPC reply value +// RETURNS: +// error +func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) { + // MustRegister could panic, so, to return error and not shutdown whole app + // we recover and return error + defer func() { + if r, fail := recover().(error); fail { + err = r + } + }() + + if rpc.svc.Collector(c.Name) != nil { + *ok = false + // alternative is to return error + // fmt.Errorf("tried to register existing collector with the name `%s`", c.Name) + return nil + } + + var collector prometheus.Collector + switch c.Type { + case Histogram: + opts := prometheus.HistogramOpts{ + Name: c.Name, + Namespace: c.Namespace, + Subsystem: c.Subsystem, + Help: c.Help, + Buckets: c.Buckets, + } + + if len(c.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, c.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case Gauge: + opts := prometheus.GaugeOpts{ + Name: c.Name, + Namespace: c.Namespace, + Subsystem: c.Subsystem, + Help: c.Help, + } + + if len(c.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, c.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case Counter: + opts := prometheus.CounterOpts{ + Name: c.Name, + Namespace: c.Namespace, + Subsystem: c.Subsystem, + Help: c.Help, + } + + if len(c.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, c.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case Summary: + opts := prometheus.SummaryOpts{ + Name: c.Name, + Namespace: c.Namespace, + Subsystem: c.Subsystem, + Help: c.Help, + } + + if len(c.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, c.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + + default: + return fmt.Errorf("unknown collector type `%s`", c.Type) + + } + + // add collector to sync.Map + rpc.svc.collectors.Store(c.Name, collector) + // that method might panic, we handle it by recover + rpc.svc.MustRegister(collector) + + *ok = true + return nil +} + +// Set the metric value (only for gaude). +func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { + defer func() { + if r, fail := recover().(error); fail { + err = r + } + }() + + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Set(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.WithLabelValues(m.Labels...).Set(m.Value) + + default: + return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) + } + + // RPC, set ok to true as return value. Need by rpc.Call reply argument + *ok = true + return nil +} diff --git a/plugins/metrics/rpc_test.go b/plugins/metrics/rpc_test.go new file mode 100644 index 00000000..9b059fe1 --- /dev/null +++ b/plugins/metrics/rpc_test.go @@ -0,0 +1,861 @@ +package metrics + +//import ( +// "github.com/sirupsen/logrus" +// "github.com/sirupsen/logrus/hooks/test" +// "github.com/spiral/roadrunner/service" +// "github.com/spiral/roadrunner/service/rpc" +// "github.com/stretchr/testify/assert" +// rpc2 "net/rpc" +// "strconv" +// "testing" +// "time" +//) +// +//var port = 5004 +// +//func setup(t *testing.T, metric string, portNum string) (*rpc2.Client, service.Container) { +// logger, _ := test.NewNullLogger() +// logger.SetLevel(logrus.DebugLevel) +// +// c := service.NewContainer(logger) +// c.Register(rpc.ID, &rpc.Plugin{}) +// c.Register(ID, &Plugin{}) +// +// assert.NoError(t, c.Init(&testCfg{ +// rpcCfg: `{"enable":true, "listen":"tcp://:` + strconv.Itoa(port) + `"}`, +// metricsCfg: `{ +// "address": "localhost:` + portNum + `", +// "collect":{ +// ` + metric + ` +// } +// }`})) +// +// // rotate ports for travis +// port++ +// +// s, _ := c.Get(ID) +// assert.NotNil(t, s) +// +// s2, _ := c.Get(rpc.ID) +// rs := s2.(*rpc.Plugin) +// +// assert.True(t, s.(*Plugin).Enabled()) +// +// go func() { +// err := c.Serve() +// if err != nil { +// t.Errorf("error during the Serve: error %v", err) +// } +// }() +// time.Sleep(time.Millisecond * 200) +// +// client, err := rs.Client() +// assert.NoError(t, err) +// if err != nil { +// panic(err) +// } +// +// return client, c +//} +// +//func Test_Set_RPC(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge" +// }`, +// "2112", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2112/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge 100`) +//} +// +//func Test_Set_RPC_Vector(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2113", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2113/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) +//} +// +//func Test_Set_RPC_CollectorError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2114", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge_2", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Set_RPC_MetricError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2115", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Set_RPC_MetricError_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2116", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +//} +// +//func Test_Set_RPC_MetricError_3(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2117", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Set", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +//} +// +//// sub +// +//func Test_Sub_RPC(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge" +// }`, +// "2118", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +// assert.True(t, ok) +// +// assert.NoError(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge", +// Value: 10.0, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2118/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge 90`) +//} +// +//func Test_Sub_RPC_Vector(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2119", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// assert.NoError(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge", +// Value: 10.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2119/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge{section="first",type="core"} 90`) +//} +// +//func Test_Register_RPC_Histogram(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2319", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ +// Name: "custom_histogram", +// Collector: Collector{ +// Namespace: "test_histogram", +// Subsystem: "test_histogram", +// Type: Histogram, +// Help: "test_histogram", +// Labels: nil, +// Buckets: []float64{0.1, 0.2, 0.5}, +// }, +// }, &ok)) +// assert.True(t, ok) +// +// var ok2 bool +// // histogram does not support Add, should be an error +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "custom_histogram", +// }, &ok2)) +// // ok should became false +// assert.False(t, ok2) +// +// out, _, err := get("http://localhost:2319/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `TYPE test_histogram_test_histogram_custom_histogram histogram`) +// +// // check buckets +// assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.1"} 0`) +// assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.2"} 0`) +// assert.Contains(t, out, `test_histogram_test_histogram_custom_histogram_bucket{le="0.5"} 0`) +//} +// +//func Test_Register_RPC_Gauge(t *testing.T) { +// // FOR register method, setup used just to init the rpc +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2324", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ +// Name: "custom_gauge", +// Collector: Collector{ +// Namespace: "test_gauge", +// Subsystem: "test_gauge", +// Type: Gauge, +// Help: "test_gauge", +// Labels: []string{"type", "section"}, +// Buckets: nil, +// }, +// }, &ok)) +// assert.True(t, ok) +// +// var ok2 bool +// // Add to custom_gauge +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "custom_gauge", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok2)) +// // ok should became true +// assert.True(t, ok2) +// +// // Subtract from custom runtime metric +// var ok3 bool +// assert.NoError(t, client.Call("metrics.Sub", Metric{ +// Name: "custom_gauge", +// Value: 10.0, +// Labels: []string{"core", "first"}, +// }, &ok3)) +// assert.True(t, ok3) +// +// out, _, err := get("http://localhost:2324/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `test_gauge_test_gauge_custom_gauge{section="first",type="core"} 90`) +//} +// +//func Test_Register_RPC_Counter(t *testing.T) { +// // FOR register method, setup used just to init the rpc +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2328", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ +// Name: "custom_counter", +// Collector: Collector{ +// Namespace: "test_counter", +// Subsystem: "test_counter", +// Type: Counter, +// Help: "test_counter", +// Labels: []string{"type", "section"}, +// Buckets: nil, +// }, +// }, &ok)) +// assert.True(t, ok) +// +// var ok2 bool +// // Add to custom_counter +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "custom_counter", +// Value: 100.0, +// Labels: []string{"type2", "section2"}, +// }, &ok2)) +// // ok should became true +// assert.True(t, ok2) +// +// out, _, err := get("http://localhost:2328/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `test_counter_test_counter_custom_counter{section="section2",type="type2"} 100`) +//} +// +//func Test_Register_RPC_Summary(t *testing.T) { +// // FOR register method, setup used just to init the rpc +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "6666", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Declare", &NamedCollector{ +// Name: "custom_summary", +// Collector: Collector{ +// Namespace: "test_summary", +// Subsystem: "test_summary", +// Type: Summary, +// Help: "test_summary", +// Labels: nil, +// Buckets: nil, +// }, +// }, &ok)) +// assert.True(t, ok) +// +// var ok2 bool +// // Add to custom_summary is not supported +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "custom_summary", +// Value: 100.0, +// Labels: []string{"type22", "section22"}, +// }, &ok2)) +// // ok should became false +// assert.False(t, ok2) +// +// out, _, err := get("http://localhost:6666/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `test_summary_test_summary_custom_summary_sum 0`) +// assert.Contains(t, out, `test_summary_test_summary_custom_summary_count 0`) +//} +// +//func Test_Sub_RPC_CollectorError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2120", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge_2", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Sub_RPC_MetricError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2121", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Sub_RPC_MetricError_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "gauge", +// "labels": ["type", "section"] +// }`, +// "2122", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +//} +// +//func Test_Sub_RPC_MetricError_3(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2123", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Sub", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +//} +// +//// -- observe +// +//func Test_Observe_RPC(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "histogram" +// }`, +// "2124", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2124/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_histogram`) +//} +// +//func Test_Observe_RPC_Vector(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2125", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2125/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_histogram`) +//} +// +//func Test_Observe_RPC_CollectorError(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2126", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Observe_RPC_MetricError(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2127", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Observe_RPC_MetricError_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2128", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// }, &ok)) +//} +// +//// -- observe summary +// +//func Test_Observe2_RPC(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary" +// }`, +// "2129", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2129/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_histogram`) +//} +// +//func Test_Observe2_RPC_Invalid(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary" +// }`, +// "2130", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram_2", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Observe2_RPC_Invalid_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "gauge" +// }`, +// "2131", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// }, &ok)) +//} +// +//func Test_Observe2_RPC_Vector(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary", +// "labels": ["type", "section"] +// }`, +// "2132", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2132/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_histogram`) +//} +// +//func Test_Observe2_RPC_CollectorError(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary", +// "labels": ["type", "section"] +// }`, +// "2133", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Observe2_RPC_MetricError(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary", +// "labels": ["type", "section"] +// }`, +// "2134", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +//} +// +//func Test_Observe2_RPC_MetricError_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_histogram":{ +// "type": "summary", +// "labels": ["type", "section"] +// }`, +// "2135", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Observe", Metric{ +// Name: "user_histogram", +// Value: 100.0, +// }, &ok)) +//} +// +//// add +//func Test_Add_RPC(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "counter" +// }`, +// "2136", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2136/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge 100`) +//} +// +//func Test_Add_RPC_Vector(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "counter", +// "labels": ["type", "section"] +// }`, +// "2137", +// ) +// defer c.Stop() +// +// var ok bool +// assert.NoError(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"core", "first"}, +// }, &ok)) +// assert.True(t, ok) +// +// out, _, err := get("http://localhost:2137/metrics") +// assert.NoError(t, err) +// assert.Contains(t, out, `user_gauge{section="first",type="core"} 100`) +//} +// +//func Test_Add_RPC_CollectorError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "counter", +// "labels": ["type", "section"] +// }`, +// "2138", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge_2", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +// +// assert.False(t, ok) +//} +// +//func Test_Add_RPC_MetricError(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "counter", +// "labels": ["type", "section"] +// }`, +// "2139", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// Labels: []string{"missing"}, +// }, &ok)) +// +// assert.False(t, ok) +//} +// +//func Test_Add_RPC_MetricError_2(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "counter", +// "labels": ["type", "section"] +// }`, +// "2140", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +// +// assert.False(t, ok) +//} +// +//func Test_Add_RPC_MetricError_3(t *testing.T) { +// client, c := setup( +// t, +// `"user_gauge":{ +// "type": "histogram", +// "labels": ["type", "section"] +// }`, +// "2141", +// ) +// defer c.Stop() +// +// var ok bool +// assert.Error(t, client.Call("metrics.Add", Metric{ +// Name: "user_gauge", +// Value: 100.0, +// }, &ok)) +//} |