summaryrefslogtreecommitdiff
path: root/plugins/metrics/rpc.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-13 13:42:40 +0300
committerValery Piashchynski <[email protected]>2020-11-13 13:42:40 +0300
commit002eb4bb1981558fa5e614aed22d322f0f45d7ea (patch)
tree9c03ebf549741e31875c5b354de72d14ce0e8b97 /plugins/metrics/rpc.go
parent7eb675a031d751787b31bd6894c936e86b190ebf (diff)
Move all interfaces to the separate folder [RPC, METRICS, LOGGER]
RPC for the metrics update to the working state RCP interface renamed to the RPCer
Diffstat (limited to 'plugins/metrics/rpc.go')
-rw-r--r--plugins/metrics/rpc.go182
1 files changed, 100 insertions, 82 deletions
diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go
index 2dd6d4ef..a5be2204 100644
--- a/plugins/metrics/rpc.go
+++ b/plugins/metrics/rpc.go
@@ -1,8 +1,8 @@
package metrics
import (
- "fmt"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/errors"
)
type rpcServer struct {
@@ -22,16 +22,17 @@ type Metric struct {
}
// Add new metric to the designated collector.
-func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
- if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ const op = errors.Op("Add metric")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -40,7 +41,7 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Add(m.Value)
@@ -50,13 +51,13 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
case *prometheus.CounterVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Add(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Add`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Add`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
@@ -65,16 +66,21 @@ func (rpc *rpcServer) Add(m *Metric, ok *bool) (err error) {
}
// Sub subtract the value from the specific metric (gauge only).
-func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ const op = errors.Op("Sub metric")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ // can it be nil ??? I guess can't
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -83,12 +89,12 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Sub(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
@@ -97,22 +103,26 @@ func (rpc *rpcServer) Sub(m *Metric, ok *bool) (err error) {
}
// Observe the value (histogram and summary only).
-func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) {
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- c := rpc.svc.Collector(m.Name)
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ const op = errors.Op("Observe metrics")
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
case *prometheus.SummaryVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Observe(m.Value)
@@ -122,105 +132,109 @@ func (rpc *rpcServer) Observe(m *Metric, ok *bool) (err error) {
case *prometheus.HistogramVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Observe(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument
*ok = true
return nil
}
+
// Declare is used to register new collector in prometheus
// THE TYPES ARE:
// NamedCollector -> Collector with the name
// bool -> RPC reply value
// RETURNS:
// error
-func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) {
+func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error {
+ const op = errors.Op("Declare metric")
// MustRegister could panic, so, to return error and not shutdown whole app
// we recover and return error
- defer func() {
- if r, fail := recover().(error); fail {
- err = r
- }
- }()
-
- if rpc.svc.Collector(c.Name) != nil {
- *ok = false
- // alternative is to return error
- // fmt.Errorf("tried to register existing collector with the name `%s`", c.Name)
- return nil
+ //defer func() {
+ // if r, fail := recover().(error); fail {
+ // err = r
+ // }
+ //}()
+
+ _, exist := rpc.svc.collectors.Load(nc.Name)
+ if exist {
+ return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name))
}
var collector prometheus.Collector
- switch c.Type {
+ switch nc.Type {
case Histogram:
opts := prometheus.HistogramOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
- Buckets: c.Buckets,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ Buckets: nc.Buckets,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewHistogramVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, nc.Labels)
} else {
collector = prometheus.NewHistogram(opts)
}
case Gauge:
opts := prometheus.GaugeOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewGaugeVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, nc.Labels)
} else {
collector = prometheus.NewGauge(opts)
}
case Counter:
opts := prometheus.CounterOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewCounterVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, nc.Labels)
} else {
collector = prometheus.NewCounter(opts)
}
case Summary:
opts := prometheus.SummaryOpts{
- Name: c.Name,
- Namespace: c.Namespace,
- Subsystem: c.Subsystem,
- Help: c.Help,
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
}
- if len(c.Labels) != 0 {
- collector = prometheus.NewSummaryVec(opts, c.Labels)
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, nc.Labels)
} else {
collector = prometheus.NewSummary(opts)
}
default:
- return fmt.Errorf("unknown collector type `%s`", c.Type)
+ return errors.E(op, errors.Errorf("unknown collector type `%s`", nc.Type))
}
// add collector to sync.Map
- rpc.svc.collectors.Store(c.Name, collector)
+ rpc.svc.collectors.Store(nc.Name, collector)
// that method might panic, we handle it by recover
- rpc.svc.MustRegister(collector)
+ err := rpc.svc.Register(collector)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
*ok = true
return nil
@@ -228,15 +242,19 @@ func (rpc *rpcServer) Declare(c *NamedCollector, ok *bool) (err error) {
// Set the metric value (only for gaude).
func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
+ const op = errors.Op("Set metric")
defer func() {
if r, fail := recover().(error); fail {
err = r
}
}()
- c := rpc.svc.Collector(m.Name)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
+ }
if c == nil {
- return fmt.Errorf("undefined collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector `%s`", m.Name))
}
switch c := c.(type) {
@@ -245,13 +263,13 @@ func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
case *prometheus.GaugeVec:
if len(m.Labels) == 0 {
- return fmt.Errorf("required labels for collector `%s`", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
}
c.WithLabelValues(m.Labels...).Set(m.Value)
default:
- return fmt.Errorf("collector `%s` does not support method `Set`", m.Name)
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Set`", m.Name))
}
// RPC, set ok to true as return value. Need by rpc.Call reply argument