summaryrefslogtreecommitdiff
path: root/plugins/metrics
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-13 13:42:40 +0300
committerValery Piashchynski <[email protected]>2020-11-13 13:42:40 +0300
commit002eb4bb1981558fa5e614aed22d322f0f45d7ea (patch)
tree9c03ebf549741e31875c5b354de72d14ce0e8b97 /plugins/metrics
parent7eb675a031d751787b31bd6894c936e86b190ebf (diff)
Move all interfaces to the separate folder [RPC, METRICS, LOGGER]
RPC for the metrics update to the working state RCP interface renamed to the RPCer
Diffstat (limited to 'plugins/metrics')
-rw-r--r--plugins/metrics/plugin.go96
-rw-r--r--plugins/metrics/rpc.go182
-rw-r--r--plugins/metrics/tests/.rr-test.yaml9
-rw-r--r--plugins/metrics/tests/docker-compose.yml7
-rw-r--r--plugins/metrics/tests/metrics_test.go51
5 files changed, 230 insertions, 115 deletions
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index b9b79d95..3795386b 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -11,37 +11,59 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/spiral/endure"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/log"
- "github.com/spiral/roadrunner/v2/metrics"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/metrics"
+ "github.com/spiral/roadrunner/v2/plugins/config"
"golang.org/x/sys/cpu"
)
const (
// ID declares public service name.
- ID = "metrics"
+ ServiceName = "metrics"
// maxHeaderSize declares max header size for prometheus server
maxHeaderSize = 1024 * 1024 * 100 // 104MB
)
+type statsProvider struct {
+ collector prometheus.Collector
+ name string
+}
+
// Plugin to manage application metrics using Prometheus.
type Plugin struct {
cfg Config
log log.Logger
mu sync.Mutex // all receivers are pointers
http *http.Server
- collectors []prometheus.Collector //sync.Map // all receivers are pointers
+ collectors sync.Map //[]statsProvider
registry *prometheus.Registry
}
// Init service.
-func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) {
- m.cfg = cfg
+func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("Metrics Init")
+ err := cfg.UnmarshalKey(ServiceName, &m.cfg)
+ if err != nil {
+ return err
+ }
+
+ //m.cfg.InitDefaults()
+
m.log = log
m.registry = prometheus.NewRegistry()
- m.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
- m.registry.MustRegister(prometheus.NewGoCollector())
+ err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ if err != nil {
+ return errors.E(op, err)
+ }
+ err = m.registry.Register(prometheus.NewGoCollector())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ //m.collectors = make([]statsProvider, 0, 2)
//if r != nil {
// if err := r.Register(ID, &rpcServer{s}); err != nil {
@@ -49,7 +71,7 @@ func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) {
// }
//}
- return true, nil
+ return nil
}
// Enabled indicates that server is able to collect metrics.
@@ -57,31 +79,35 @@ func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) {
// return m.cfg != nil
//}
//
-//// Register new prometheus collector.
-//func (m *Plugin) Register(c prometheus.Collector) error {
-// return m.registry.Register(c)
-//}
+// Register new prometheus collector.
+func (m *Plugin) Register(c prometheus.Collector) error {
+ return m.registry.Register(c)
+}
// MustRegister registers new collector or fails with panic.
-func (m *Plugin) MustRegister(c prometheus.Collector) {
- m.registry.MustRegister(c)
-}
+//func (m *Plugin) MustRegister(c prometheus.Collector) {
+// m.registry.MustRegister(c)
+//}
// Serve prometheus metrics service.
-func (m *Plugin) Serve() error {
+func (m *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
// register application specific metrics
- collectors, err := m.cfg.getCollectors()
- if err != nil {
- return err
- }
+ //collectors, err := m.cfg.getCollectors()
+ //if err != nil {
+ // return err
+ //}
- for name, collector := range collectors {
- if err := m.registry.Register(collector); err != nil {
- return err
+ m.collectors.Range(func(key, value interface{}) bool {
+ // key - name
+ // value - collector
+ c := value.(prometheus.Collector)
+ if err := m.registry.Register(c); err != nil {
+ errCh <- err
+ return false
}
-
- m.collectors.Store(name, collector)
- }
+ return true
+ })
m.mu.Lock()
@@ -155,12 +181,13 @@ func (m *Plugin) Serve() error {
}
m.mu.Unlock()
- err = m.http.ListenAndServe()
+ err := m.http.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
- return err
+ errCh <- err
+ return errCh
}
- return nil
+ return errCh
}
// Stop prometheus metrics service.
@@ -182,12 +209,15 @@ func (m *Plugin) Stop() {
func (m *Plugin) Collects() []interface{} {
return []interface{}{
- m.Register,
+ m.AddStatProvider,
}
}
// Collector returns application specific collector by name or nil if collector not found.
-func (m *Plugin) Register(stat metrics.StatProvider) error {
- m.collectors = append(m.collectors, stat.MetricsCollector())
+func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error {
+ m.collectors.Store(name, statsProvider{
+ collector: stat.MetricsCollector(),
+ name: name.Name(),
+ })
return nil
}
diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go
index 2dd6d4ef..a5be2204 100644
--- a/plugins/metrics/rpc.go
+++ b/plugins/metrics/rpc.go
@@ -1,8 +1,8 @@
package metrics
import (
- "fmt"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/errors"
)
type rpcServer struct {
@@ -22,16 +22,17 @@ type Metric struct {
}
// Add new metric to the designated collector.
-func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
- if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ const op = errors.Op("Add metric")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -40,7 +41,7 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Add(m.Value)
@@ -50,13 +51,13 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
case *prometheus.CounterVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Add(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Add`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Add`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
@@ -65,16 +66,21 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
}
// Sub subtract the value from the specific metric (gauge only).
-func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ const op = errors.Op("Sub metric")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ // can it be nil ??? I guess can't
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -83,12 +89,12 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Sub(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
@@ -97,22 +103,26 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
}
// Observe the value (histogram and summary only).
-func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ const op = errors.Op("Observe metrics")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
case *prometheus.SummaryVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Observe(m.Value)
@@ -122,105 +132,109 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) {
case *prometheus.HistogramVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Observe(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
*ok = true
return nil
}
+
// Declare is used to register new collector in prometheus
// THE TYPES ARE:
// NamedCollector -> Collector with the name
// bool -> RPC reply value
// RETURNS:
// error
-func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) {
+func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error {
+ const op = errors.Op("Declare metric")
// MustRegister could panic, so, to return error and not shutdown whole app
// we recover and return error
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- if rpc.svc.Collector(c.Name) != nil {
- *ok = false
- // alternative is to return error
- // fmt.Errorf("tried to register existing collector with the name `%s`", c.Name)
- return nil
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ _, exist := rpc.svc.collectors.Load(nc.Name)
+ if exist {
+ return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name))
}
var collector prometheus.Collector
- switch c.Type {
+ switch nc.Type {
case Histogram:
opts := prometheus.HistogramOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
- Buckets: c.Buckets,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ Buckets: nc.Buckets,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewHistogramVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, nc.Labels)
} else {
collector = prometheus.NewHistogram(opts)
}
case Gauge:
opts := prometheus.GaugeOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewGaugeVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, nc.Labels)
} else {
collector = prometheus.NewGauge(opts)
}
case Counter:
opts := prometheus.CounterOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewCounterVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, nc.Labels)
} else {
collector = prometheus.NewCounter(opts)
}
case Summary:
opts := prometheus.SummaryOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewSummaryVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, nc.Labels)
} else {
collector = prometheus.NewSummary(opts)
}
default:
- return fmt.Errorf("unknown collector type `%s`", c.Type)
+ return errors.E(op, errors.Errorf("unknown collector type `%s`", nc.Type))
}
// add collector to sync.Map
- rpc.svc.collectors.Store(c.Name, collector)
+ rpc.svc.collectors.Store(nc.Name, collector)
// that method might panic, we handle it by recover
- rpc.svc.MustRegister(collector)
+ err := rpc.svc.Register(collector)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
*ok = true
return nil
@@ -228,15 +242,19 @@ func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) {
// Set the metric value (only for gaude).
func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
+ const op = errors.Op("Set metric")
defer func() {
if r, fail := recover().(error); fail {
err = r
}
}()
- c := rpc.svc.Collector(m.Name)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -245,13 +263,13 @@ func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Set(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Set`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Set`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml
new file mode 100644
index 00000000..cc4771d4
--- /dev/null
+++ b/plugins/metrics/tests/.rr-test.yaml
@@ -0,0 +1,9 @@
+metrics:
+ # prometheus client address (path /metrics added automatically)
+ address: localhost:2112
+ collect:
+ app_metric:
+ type: histogram
+ help: "Custom application metric"
+ labels: ["type"]
+ buckets: [0.1, 0.2, 0.3, 1.0] \ No newline at end of file
diff --git a/plugins/metrics/tests/docker-compose.yml b/plugins/metrics/tests/docker-compose.yml
new file mode 100644
index 00000000..610633b4
--- /dev/null
+++ b/plugins/metrics/tests/docker-compose.yml
@@ -0,0 +1,7 @@
+version: '3.7'
+
+services:
+ prometheus:
+ image: prom/prometheus
+ ports:
+ - 9090:9090
diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go
new file mode 100644
index 00000000..d04c75d3
--- /dev/null
+++ b/plugins/metrics/tests/metrics_test.go
@@ -0,0 +1,51 @@
+package tests
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/metrics"
+)
+
+func TestMetricsInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-test.yaml"
+
+ err = cont.Register(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&metrics.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ errCh, err := cont.Serve()
+
+ for {
+ select {
+ case e := <-errCh:
+ fmt.Println(e)
+ }
+ }
+}