diff options
author | Valery Piashchynski <[email protected]> | 2020-11-13 13:42:40 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-13 13:42:40 +0300 |
commit | 002eb4bb1981558fa5e614aed22d322f0f45d7ea (patch) | |
tree | 9c03ebf549741e31875c5b354de72d14ce0e8b97 /plugins/metrics | |
parent | 7eb675a031d751787b31bd6894c936e86b190ebf (diff) |
Move all interfaces to the separate folder [RPC, METRICS, LOGGER]
RPC for the metrics update to the working state
RCP interface renamed to the RPCer
Diffstat (limited to 'plugins/metrics')
-rw-r--r-- | plugins/metrics/plugin.go | 96 | ||||
-rw-r--r-- | plugins/metrics/rpc.go | 182 | ||||
-rw-r--r-- | plugins/metrics/tests/.rr-test.yaml | 9 | ||||
-rw-r--r-- | plugins/metrics/tests/docker-compose.yml | 7 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 51 |
5 files changed, 230 insertions, 115 deletions
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index b9b79d95..3795386b 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -11,37 +11,59 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spiral/endure" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/log" - "github.com/spiral/roadrunner/v2/metrics" + "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/metrics" + "github.com/spiral/roadrunner/v2/plugins/config" "golang.org/x/sys/cpu" ) const ( // ID declares public service name. - ID = "metrics" + ServiceName = "metrics" // maxHeaderSize declares max header size for prometheus server maxHeaderSize = 1024 * 1024 * 100 // 104MB ) +type statsProvider struct { + collector prometheus.Collector + name string +} + // Plugin to manage application metrics using Prometheus. type Plugin struct { cfg Config log log.Logger mu sync.Mutex // all receivers are pointers http *http.Server - collectors []prometheus.Collector //sync.Map // all receivers are pointers + collectors sync.Map //[]statsProvider registry *prometheus.Registry } // Init service. -func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) { - m.cfg = cfg +func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("Metrics Init") + err := cfg.UnmarshalKey(ServiceName, &m.cfg) + if err != nil { + return err + } + + //m.cfg.InitDefaults() + m.log = log m.registry = prometheus.NewRegistry() - m.registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - m.registry.MustRegister(prometheus.NewGoCollector()) + err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + if err != nil { + return errors.E(op, err) + } + err = m.registry.Register(prometheus.NewGoCollector()) + if err != nil { + return errors.E(op, err) + } + + //m.collectors = make([]statsProvider, 0, 2) //if r != nil { // if err := r.Register(ID, &rpcServer{s}); err != nil { @@ -49,7 +71,7 @@ func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) { // } //} - return true, nil + return nil } // Enabled indicates that server is able to collect metrics. @@ -57,31 +79,35 @@ func (m *Plugin) Init(cfg Config, log log.Logger) (bool, error) { // return m.cfg != nil //} // -//// Register new prometheus collector. -//func (m *Plugin) Register(c prometheus.Collector) error { -// return m.registry.Register(c) -//} +// Register new prometheus collector. +func (m *Plugin) Register(c prometheus.Collector) error { + return m.registry.Register(c) +} // MustRegister registers new collector or fails with panic. -func (m *Plugin) MustRegister(c prometheus.Collector) { - m.registry.MustRegister(c) -} +//func (m *Plugin) MustRegister(c prometheus.Collector) { +// m.registry.MustRegister(c) +//} // Serve prometheus metrics service. -func (m *Plugin) Serve() error { +func (m *Plugin) Serve() chan error { + errCh := make(chan error, 1) // register application specific metrics - collectors, err := m.cfg.getCollectors() - if err != nil { - return err - } + //collectors, err := m.cfg.getCollectors() + //if err != nil { + // return err + //} - for name, collector := range collectors { - if err := m.registry.Register(collector); err != nil { - return err + m.collectors.Range(func(key, value interface{}) bool { + // key - name + // value - collector + c := value.(prometheus.Collector) + if err := m.registry.Register(c); err != nil { + errCh <- err + return false } - - m.collectors.Store(name, collector) - } + return true + }) m.mu.Lock() @@ -155,12 +181,13 @@ func (m *Plugin) Serve() error { } m.mu.Unlock() - err = m.http.ListenAndServe() + err := m.http.ListenAndServe() if err != nil && err != http.ErrServerClosed { - return err + errCh <- err + return errCh } - return nil + return errCh } // Stop prometheus metrics service. @@ -182,12 +209,15 @@ func (m *Plugin) Stop() { func (m *Plugin) Collects() []interface{} { return []interface{}{ - m.Register, + m.AddStatProvider, } } // Collector returns application specific collector by name or nil if collector not found. -func (m *Plugin) Register(stat metrics.StatProvider) error { - m.collectors = append(m.collectors, stat.MetricsCollector()) +func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error { + m.collectors.Store(name, statsProvider{ + collector: stat.MetricsCollector(), + name: name.Name(), + }) return nil } diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go index 2dd6d4ef..a5be2204 100644 --- a/plugins/metrics/rpc.go +++ b/plugins/metrics/rpc.go @@ -1,8 +1,8 @@ package metrics import ( - "fmt" "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/errors" ) type rpcServer struct { @@ -22,16 +22,17 @@ type Metric struct { } // Add new metric to the designated collector. -func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) - if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) +func (rpc *rpcServer) Add(m *Metric, ok *bool) error { + const op = errors.Op("Add metric") + //defer func() { + // if r, fail := recover().(error); fail { + // err = r + // } + //}() + + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) } switch c := c.(type) { @@ -40,7 +41,7 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { case *prometheus.GaugeVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Add(m.Value) @@ -50,13 +51,13 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { case *prometheus.CounterVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Add(m.Value) default: - return fmt.Errorf("collector `%s` does not support method `Add`", m.Name) + return errors.E(op, errors.Errorf("collector `%s` does not support method `Add`", m.Name)) } // RPC, set ok to true as return value. Need by rpc.Call reply argument @@ -65,16 +66,21 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) { } // Sub subtract the value from the specific metric (gauge only). -func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) +func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { + const op = errors.Op("Sub metric") + //defer func() { + // if r, fail := recover().(error); fail { + // err = r + // } + //}() + + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) + } if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) + // can it be nil ??? I guess can't + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) } switch c := c.(type) { @@ -83,12 +89,12 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { case *prometheus.GaugeVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Sub(m.Value) default: - return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) + return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name)) } // RPC, set ok to true as return value. Need by rpc.Call reply argument @@ -97,22 +103,26 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) { } // Observe the value (histogram and summary only). -func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) { - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - c := rpc.svc.Collector(m.Name) +func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { + const op = errors.Op("Observe metrics") + //defer func() { + // if r, fail := recover().(error); fail { + // err = r + // } + //}() + + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) + } if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) } switch c := c.(type) { case *prometheus.SummaryVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Observe(m.Value) @@ -122,105 +132,109 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) { case *prometheus.HistogramVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Observe(m.Value) default: - return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) + return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name)) } // RPC, set ok to true as return value. Need by rpc.Call reply argument *ok = true return nil } + // Declare is used to register new collector in prometheus // THE TYPES ARE: // NamedCollector -> Collector with the name // bool -> RPC reply value // RETURNS: // error -func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) { +func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error { + const op = errors.Op("Declare metric") // MustRegister could panic, so, to return error and not shutdown whole app // we recover and return error - defer func() { - if r, fail := recover().(error); fail { - err = r - } - }() - - if rpc.svc.Collector(c.Name) != nil { - *ok = false - // alternative is to return error - // fmt.Errorf("tried to register existing collector with the name `%s`", c.Name) - return nil + //defer func() { + // if r, fail := recover().(error); fail { + // err = r + // } + //}() + + _, exist := rpc.svc.collectors.Load(nc.Name) + if exist { + return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name)) } var collector prometheus.Collector - switch c.Type { + switch nc.Type { case Histogram: opts := prometheus.HistogramOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, - Buckets: c.Buckets, + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, + Buckets: nc.Buckets, } - if len(c.Labels) != 0 { - collector = prometheus.NewHistogramVec(opts, c.Labels) + if len(nc.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, nc.Labels) } else { collector = prometheus.NewHistogram(opts) } case Gauge: opts := prometheus.GaugeOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, } - if len(c.Labels) != 0 { - collector = prometheus.NewGaugeVec(opts, c.Labels) + if len(nc.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, nc.Labels) } else { collector = prometheus.NewGauge(opts) } case Counter: opts := prometheus.CounterOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, } - if len(c.Labels) != 0 { - collector = prometheus.NewCounterVec(opts, c.Labels) + if len(nc.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, nc.Labels) } else { collector = prometheus.NewCounter(opts) } case Summary: opts := prometheus.SummaryOpts{ - Name: c.Name, - Namespace: c.Namespace, - Subsystem: c.Subsystem, - Help: c.Help, + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, } - if len(c.Labels) != 0 { - collector = prometheus.NewSummaryVec(opts, c.Labels) + if len(nc.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, nc.Labels) } else { collector = prometheus.NewSummary(opts) } default: - return fmt.Errorf("unknown collector type `%s`", c.Type) + return errors.E(op, errors.Errorf("unknown collector type `%s`", nc.Type)) } // add collector to sync.Map - rpc.svc.collectors.Store(c.Name, collector) + rpc.svc.collectors.Store(nc.Name, collector) // that method might panic, we handle it by recover - rpc.svc.MustRegister(collector) + err := rpc.svc.Register(collector) + if err != nil { + *ok = false + return errors.E(op, err) + } *ok = true return nil @@ -228,15 +242,19 @@ func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) { // Set the metric value (only for gaude). func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { + const op = errors.Op("Set metric") defer func() { if r, fail := recover().(error); fail { err = r } }() - c := rpc.svc.Collector(m.Name) + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) + } if c == nil { - return fmt.Errorf("undefined collector `%s`", m.Name) + return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name)) } switch c := c.(type) { @@ -245,13 +263,13 @@ func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { case *prometheus.GaugeVec: if len(m.Labels) == 0 { - return fmt.Errorf("required labels for collector `%s`", m.Name) + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) } c.WithLabelValues(m.Labels...).Set(m.Value) default: - return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) + return errors.E(op, errors.Errorf("collector `%s` does not support method `Set`", m.Name)) } // RPC, set ok to true as return value. Need by rpc.Call reply argument diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml new file mode 100644 index 00000000..cc4771d4 --- /dev/null +++ b/plugins/metrics/tests/.rr-test.yaml @@ -0,0 +1,9 @@ +metrics: + # prometheus client address (path /metrics added automatically) + address: localhost:2112 + collect: + app_metric: + type: histogram + help: "Custom application metric" + labels: ["type"] + buckets: [0.1, 0.2, 0.3, 1.0]
\ No newline at end of file diff --git a/plugins/metrics/tests/docker-compose.yml b/plugins/metrics/tests/docker-compose.yml new file mode 100644 index 00000000..610633b4 --- /dev/null +++ b/plugins/metrics/tests/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3.7' + +services: + prometheus: + image: prom/prometheus + ports: + - 9090:9090 diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go new file mode 100644 index 00000000..d04c75d3 --- /dev/null +++ b/plugins/metrics/tests/metrics_test.go @@ -0,0 +1,51 @@ +package tests + +import ( + "fmt" + "testing" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/metrics" +) + +func TestMetricsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = ".rr-test.yaml" + + err = cont.Register(cfg) + if err != nil { + t.Fatal(err) + } + + err = cont.Register(&metrics.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = cont.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + errCh, err := cont.Serve() + + for { + select { + case e := <-errCh: + fmt.Println(e) + } + } +} |