diff options
author | Valery Piashchynski <[email protected]> | 2020-11-16 15:46:08 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2020-11-16 15:46:08 +0300 |
commit | 6236aac37bd1661b20400689f66d1e92283c5111 (patch) | |
tree | eb8a9a4e4717fb4cd6c971b5ce67c53b5f6a0f8c /plugins | |
parent | 0874bcb2f6b284a940ba4f3507eb8c4619c27868 (diff) | |
parent | 38f6925db27dd94cfbca873901bf932ed1456906 (diff) |
Merge pull request #392 from spiral/plugin/metricsv2.0.0-alpha18
[RR2] Metrics plugin 2.0
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/logger/plugin.go | 2 | ||||
-rw-r--r-- | plugins/logger/tests/plugin.go | 2 | ||||
-rw-r--r-- | plugins/metrics/config.go | 135 | ||||
-rw-r--r-- | plugins/metrics/config_test.go | 87 | ||||
-rw-r--r-- | plugins/metrics/doc.go | 1 | ||||
-rw-r--r-- | plugins/metrics/plugin.go | 230 | ||||
-rw-r--r-- | plugins/metrics/rpc.go | 294 | ||||
-rw-r--r-- | plugins/metrics/tests/.rr-test.yaml | 13 | ||||
-rw-r--r-- | plugins/metrics/tests/docker-compose.yml | 7 | ||||
-rw-r--r-- | plugins/metrics/tests/metrics_test.go | 754 | ||||
-rw-r--r-- | plugins/metrics/tests/plugin1.go | 46 | ||||
-rwxr-xr-x | plugins/rpc/plugin.go | 36 | ||||
-rw-r--r-- | plugins/rpc/tests/plugin1.go | 4 | ||||
-rw-r--r-- | plugins/server/config.go (renamed from plugins/app/config.go) | 10 | ||||
-rw-r--r-- | plugins/server/plugin.go (renamed from plugins/app/plugin.go) | 24 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr-no-app-section.yaml (renamed from plugins/app/tests/configs/.rr-no-app-section.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr-sockets.yaml (renamed from plugins/app/tests/configs/.rr-sockets.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr-tcp.yaml (renamed from plugins/app/tests/configs/.rr-tcp.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr-wrong-command.yaml (renamed from plugins/app/tests/configs/.rr-wrong-command.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr-wrong-relay.yaml (renamed from plugins/app/tests/configs/.rr-wrong-relay.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/configs/.rr.yaml (renamed from plugins/app/tests/configs/.rr.yaml) | 2 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go (renamed from plugins/app/tests/plugin_pipes.go) | 11 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go (renamed from plugins/app/tests/plugin_sockets.go) | 9 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go (renamed from plugins/app/tests/plugin_tcp.go) | 9 | ||||
-rw-r--r-- | plugins/server/tests/server_test.go (renamed from plugins/app/tests/app_test.go) | 16 | ||||
-rw-r--r-- | plugins/server/tests/socket.php (renamed from plugins/app/tests/socket.php) | 0 | ||||
-rw-r--r-- | plugins/server/tests/tcp.php (renamed from plugins/app/tests/tcp.php) | 0 |
27 files changed, 1631 insertions, 71 deletions
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go index f05d0ff0..0a8485d9 100644 --- a/plugins/logger/plugin.go +++ b/plugins/logger/plugin.go @@ -2,7 +2,7 @@ package logger import ( "github.com/spiral/endure" - "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/plugins/config" "go.uber.org/zap" ) diff --git a/plugins/logger/tests/plugin.go b/plugins/logger/tests/plugin.go index 75d2736d..32238f63 100644 --- a/plugins/logger/tests/plugin.go +++ b/plugins/logger/tests/plugin.go @@ -2,7 +2,7 @@ package tests import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/interfaces/log" "github.com/spiral/roadrunner/v2/plugins/config" ) diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go new file mode 100644 index 00000000..933b7eb8 --- /dev/null +++ b/plugins/metrics/config.go @@ -0,0 +1,135 @@ +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +// Config configures metrics service. +type Config struct { + // Address to listen + Address string + + // Collect define application specific metrics. + Collect map[string]Collector +} + +type NamedCollector struct { + // Name of the collector + Name string `json:"name"` + + // Collector structure + Collector `json:"collector"` +} + +// CollectorType represents prometheus collector types +type CollectorType string + +const ( + // Histogram type + Histogram CollectorType = "histogram" + + // Gauge type + Gauge CollectorType = "gauge" + + // Counter type + Counter CollectorType = "counter" + + // Summary type + Summary CollectorType = "summary" +) + +// Collector describes single application specific metric. +type Collector struct { + // Namespace of the metric. + Namespace string `json:"namespace"` + // Subsystem of the metric. + Subsystem string `json:"subsystem"` + // Collector type (histogram, gauge, counter, summary). + Type CollectorType `json:"type"` + // Help of collector. + Help string `json:"help"` + // Labels for vectorized metrics. + Labels []string `json:"labels"` + // Buckets for histogram metric. + Buckets []float64 `json:"buckets"` +} + +// register application specific metrics. +func (c *Config) getCollectors() (map[string]prometheus.Collector, error) { + if c.Collect == nil { + return nil, nil + } + + collectors := make(map[string]prometheus.Collector) + + for name, m := range c.Collect { + var collector prometheus.Collector + switch m.Type { + case Histogram: + opts := prometheus.HistogramOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + Buckets: m.Buckets, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, m.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case Gauge: + opts := prometheus.GaugeOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, m.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case Counter: + opts := prometheus.CounterOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, m.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case Summary: + opts := prometheus.SummaryOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, m.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + default: + return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) + } + + collectors[name] = collector + } + + return collectors, nil +} + +func (c *Config) InitDefaults() { + +} diff --git a/plugins/metrics/config_test.go b/plugins/metrics/config_test.go new file mode 100644 index 00000000..24c8406c --- /dev/null +++ b/plugins/metrics/config_test.go @@ -0,0 +1,87 @@ +package metrics + +import ( + "bytes" + "testing" + + json "github.com/json-iterator/go" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := `{"request": {"From": "Something"}}` + c := &Config{} + f := new(bytes.Buffer) + f.WriteString(cfg) + + err := json.Unmarshal(f.Bytes(), &c) + if err != nil { + t.Fatal(err) + } +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := `{"dir": "/dir/"` + c := &Config{} + + f := new(bytes.Buffer) + f.WriteString(cfg) + + err := json.Unmarshal(f.Bytes(), &c) + assert.Error(t, err) +} + +func Test_Config_Metrics(t *testing.T) { + cfg := `{ +"collect":{ + "metric1":{"type": "gauge"}, + "metric2":{ "type": "counter"}, + "metric3":{"type": "summary"}, + "metric4":{"type": "histogram"} +} +}` + c := &Config{} + f := new(bytes.Buffer) + f.WriteString(cfg) + + err := json.Unmarshal(f.Bytes(), &c) + if err != nil { + t.Fatal(err) + } + + m, err := c.getCollectors() + assert.NoError(t, err) + + assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"]) + assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"]) + assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"]) + assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"]) +} + +func Test_Config_MetricsVector(t *testing.T) { + cfg := `{ +"collect":{ + "metric1":{"type": "gauge","labels":["label"]}, + "metric2":{ "type": "counter","labels":["label"]}, + "metric3":{"type": "summary","labels":["label"]}, + "metric4":{"type": "histogram","labels":["label"]} +} +}` + c := &Config{} + f := new(bytes.Buffer) + f.WriteString(cfg) + + err := json.Unmarshal(f.Bytes(), &c) + if err != nil { + t.Fatal(err) + } + + m, err := c.getCollectors() + assert.NoError(t, err) + + assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"]) + assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"]) + assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"]) + assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"]) +} diff --git a/plugins/metrics/doc.go b/plugins/metrics/doc.go new file mode 100644 index 00000000..1abe097a --- /dev/null +++ b/plugins/metrics/doc.go @@ -0,0 +1 @@ +package metrics diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go new file mode 100644 index 00000000..3fd42ee4 --- /dev/null +++ b/plugins/metrics/plugin.go @@ -0,0 +1,230 @@ +package metrics + +import ( + "context" + "crypto/tls" + "net/http" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spiral/endure" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/metrics" + "github.com/spiral/roadrunner/v2/plugins/config" + "golang.org/x/sys/cpu" +) + +const ( + // ID declares public service name. + ServiceName = "metrics" + // maxHeaderSize declares max header size for prometheus server + maxHeaderSize = 1024 * 1024 * 100 // 104MB +) + +type statsProvider struct { + collectors []prometheus.Collector + name string +} + +// Plugin to manage application metrics using Prometheus. +type Plugin struct { + cfg Config + log log.Logger + mu sync.Mutex // all receivers are pointers + http *http.Server + collectors sync.Map // all receivers are pointers + registry *prometheus.Registry +} + +// Init service. +func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error { + const op = errors.Op("Metrics Init") + err := cfg.UnmarshalKey(ServiceName, &m.cfg) + if err != nil { + return err + } + + // TODO figure out what is Init + m.cfg.InitDefaults() + + m.log = log + m.registry = prometheus.NewRegistry() + + // Default + err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + if err != nil { + return errors.E(op, err) + } + + // Default + err = m.registry.Register(prometheus.NewGoCollector()) + if err != nil { + return errors.E(op, err) + } + + collectors, err := m.cfg.getCollectors() + if err != nil { + return errors.E(op, err) + } + + // Register invocation will be later in the Serve method + for k, v := range collectors { + m.collectors.Store(k, statsProvider{ + collectors: []prometheus.Collector{v}, + name: k, + }) + } + return nil +} + +// Register new prometheus collector. +func (m *Plugin) Register(c prometheus.Collector) error { + return m.registry.Register(c) +} + +// Serve prometheus metrics service. +func (m *Plugin) Serve() chan error { + errCh := make(chan error, 1) + m.collectors.Range(func(key, value interface{}) bool { + // key - name + // value - statsProvider struct + c := value.(statsProvider) + for _, v := range c.collectors { + if err := m.registry.Register(v); err != nil { + errCh <- err + return false + } + } + + return true + }) + + var topCipherSuites []uint16 + var defaultCipherSuitesTLS13 []uint16 + + hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ + hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL + // Keep in sync with crypto/aes/cipher_s390x.go. + hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM) + + hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X + + if hasGCMAsm { + // If AES-GCM hardware is provided then prioritise AES-GCM + // cipher suites. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } else { + // Without AES-GCM hardware, we put the ChaCha20-Poly1305 + // cipher suites first. + topCipherSuites = []uint16{ + tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305, + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, + } + defaultCipherSuitesTLS13 = []uint16{ + tls.TLS_CHACHA20_POLY1305_SHA256, + tls.TLS_AES_128_GCM_SHA256, + tls.TLS_AES_256_GCM_SHA384, + } + } + + DefaultCipherSuites := make([]uint16, 0, 22) + DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) + DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) + + m.http = &http.Server{ + Addr: m.cfg.Address, + Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + IdleTimeout: time.Hour * 24, + ReadTimeout: time.Minute * 60, + MaxHeaderBytes: maxHeaderSize, + ReadHeaderTimeout: time.Minute * 60, + WriteTimeout: time.Minute * 60, + TLSConfig: &tls.Config{ + CurvePreferences: []tls.CurveID{ + tls.CurveP256, + tls.CurveP384, + tls.CurveP521, + tls.X25519, + }, + CipherSuites: DefaultCipherSuites, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + }, + } + + go func() { + err := m.http.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + errCh <- err + return + } + }() + + return errCh +} + +// Stop prometheus metrics service. +func (m *Plugin) Stop() error { + m.mu.Lock() + defer m.mu.Unlock() + + if m.http != nil { + // timeout is 10 seconds + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + err := m.http.Shutdown(ctx) + if err != nil { + // Function should be Stop() error + m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) + } + } + return nil +} + +// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named) +func (m *Plugin) Collects() []interface{} { + return []interface{}{ + m.AddStatProvider, + } +} + +// Collector returns application specific collector by name or nil if collector not found. +func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error { + m.collectors.Store(name.Name(), statsProvider{ + collectors: stat.MetricsCollector(), + name: name.Name(), + }) + return nil +} + +// RPC interface satisfaction +func (m *Plugin) Name() string { + return ServiceName +} + +// RPC interface satisfaction +func (m *Plugin) RPC() interface{} { + return &rpcServer{ + svc: m, + log: m.log, + } +} diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go new file mode 100644 index 00000000..b8897098 --- /dev/null +++ b/plugins/metrics/rpc.go @@ -0,0 +1,294 @@ +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/interfaces/log" +) + +type rpcServer struct { + svc *Plugin + log log.Logger +} + +// Metric represent single metric produced by the application. +type Metric struct { + // Collector name. + Name string + + // Collector value. + Value float64 + + // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values. + Labels []string +} + +// Add new metric to the designated collector. +func (rpc *rpcServer) Add(m *Metric, ok *bool) error { + const op = errors.Op("Add metric") + rpc.log.Info("Adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + rpc.log.Error("undefined collector", "collector", m.Name) + return errors.E(op, errors.Errorf("undefined collector %s, try first Declare the desired collector", m.Name)) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Add(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + rpc.log.Error("required labels for collector", "collector", m.Name) + return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) + } + + gauge, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) + return errors.E(op, err) + } + gauge.Add(m.Value) + case prometheus.Counter: + c.Add(m.Value) + + case *prometheus.CounterVec: + if len(m.Labels) == 0 { + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) + } + + gauge, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) + return errors.E(op, err) + } + gauge.Add(m.Value) + + default: + return errors.E(op, errors.Errorf("collector %s does not support method `Add`", m.Name)) + } + + // RPC, set ok to true as return value. Need by rpc.Call reply argument + *ok = true + rpc.log.Info("new metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value) + return nil +} + +// Sub subtract the value from the specific metric (gauge only). +func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { + const op = errors.Op("Subtracting metric") + rpc.log.Info("Subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels) + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + if c == nil { + // can it be nil ??? I guess can't + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Sub(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + rpc.log.Error("required labels for collector, but none was provided", "name", m.Name, "value", m.Value) + return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) + } + + gauge, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) + return errors.E(op, err) + } + gauge.Sub(m.Value) + default: + return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name)) + } + rpc.log.Info("Subtracting operation applied successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) + + *ok = true + return nil +} + +// Observe the value (histogram and summary only). +func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { + const op = errors.Op("Observe metrics") + rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels) + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + if c == nil { + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + + switch c := c.(type) { + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) + } + + observer, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + return errors.E(op, err) + } + observer.Observe(m.Value) + + case prometheus.Histogram: + c.Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name)) + } + + observer, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) + return errors.E(op, err) + } + observer.Observe(m.Value) + default: + return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name)) + } + + rpc.log.Info("observe operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) + + *ok = true + return nil +} + +// Declare is used to register new collector in prometheus +// THE TYPES ARE: +// NamedCollector -> Collector with the name +// bool -> RPC reply value +// RETURNS: +// error +func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error { + const op = errors.Op("Declare metric") + rpc.log.Info("Declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) + _, exist := rpc.svc.collectors.Load(nc.Name) + if exist { + rpc.log.Error("metric with provided name already exist", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) + return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name)) + } + + var collector prometheus.Collector + switch nc.Type { + case Histogram: + opts := prometheus.HistogramOpts{ + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, + Buckets: nc.Buckets, + } + + if len(nc.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, nc.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case Gauge: + opts := prometheus.GaugeOpts{ + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, + } + + if len(nc.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, nc.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case Counter: + opts := prometheus.CounterOpts{ + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, + } + + if len(nc.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, nc.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case Summary: + opts := prometheus.SummaryOpts{ + Name: nc.Name, + Namespace: nc.Namespace, + Subsystem: nc.Subsystem, + Help: nc.Help, + } + + if len(nc.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, nc.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + + default: + return errors.E(op, errors.Errorf("unknown collector type %s", nc.Type)) + } + + // add collector to sync.Map + rpc.svc.collectors.Store(nc.Name, collector) + // that method might panic, we handle it by recover + err := rpc.svc.Register(collector) + if err != nil { + *ok = false + return errors.E(op, err) + } + + rpc.log.Info("metric successfully added", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace) + + *ok = true + return nil +} + +// Set the metric value (only for gaude). +func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) { + const op = errors.Op("Set metric") + rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels) + + c, exist := rpc.svc.collectors.Load(m.Name) + if !exist { + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + if c == nil { + return errors.E(op, errors.Errorf("undefined collector %s", m.Name)) + } + + switch c := c.(type) { + case prometheus.Gauge: + c.Set(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + rpc.log.Error("required labels for collector", "collector", m.Name) + return errors.E(op, errors.Errorf("required labels for collector %s", m.Name)) + } + + gauge, err := c.GetMetricWithLabelValues(m.Labels...) + if err != nil { + rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels) + return errors.E(op, err) + } + gauge.Set(m.Value) + + default: + return errors.E(op, errors.Errorf("collector `%s` does not support method Set", m.Name)) + } + + rpc.log.Info("set operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value) + + *ok = true + return nil +} diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml new file mode 100644 index 00000000..79343e3c --- /dev/null +++ b/plugins/metrics/tests/.rr-test.yaml @@ -0,0 +1,13 @@ +rpc: + listen: tcp://127.0.0.1:6001 + disabled: false + +metrics: + # prometheus client address (path /metrics added automatically) + address: localhost:2112 + collect: + app_metric: + type: histogram + help: "Custom application metric" + labels: [ "type" ] + buckets: [ 0.1, 0.2, 0.3, 1.0 ]
\ No newline at end of file diff --git a/plugins/metrics/tests/docker-compose.yml b/plugins/metrics/tests/docker-compose.yml new file mode 100644 index 00000000..610633b4 --- /dev/null +++ b/plugins/metrics/tests/docker-compose.yml @@ -0,0 +1,7 @@ +version: '3.7' + +services: + prometheus: + image: prom/prometheus + ports: + - 9090:9090 diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go new file mode 100644 index 00000000..ed5d085a --- /dev/null +++ b/plugins/metrics/tests/metrics_test.go @@ -0,0 +1,754 @@ +package tests + +import ( + "io/ioutil" + "net" + "net/http" + "net/rpc" + "os" + "os/signal" + "syscall" + "testing" + "time" + + "github.com/spiral/endure" + "github.com/spiral/goridge/v2" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/metrics" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/stretchr/testify/assert" +) + +const dialAddr = "127.0.0.1:6001" +const dialNetwork = "tcp" +const getAddr = "http://localhost:2112/metrics" + +// get request and return body +func get(url string) (string, error) { + r, err := http.Get(url) + if err != nil { + return "", err + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil { + return "", err + } + + err = r.Body.Close() + if err != nil { + return "", err + } + // unsafe + return string(b), err +} + +func TestMetricsInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = ".rr-test.yaml" + + err = cont.Register(cfg) + if err != nil { + t.Fatal(err) + } + + err = cont.Register(&metrics.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = cont.Register(&rpcPlugin.Plugin{}) + if err != nil { + t.Fatal(err) + } + + err = cont.Register(&logger.ZapLogger{}) + if err != nil { + t.Fatal(err) + } + err = cont.Register(&Plugin1{}) + if err != nil { + t.Fatal(err) + } + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Second * 5) + + out, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "go_gc_duration_seconds") + + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } +} + +func TestMetricsGaugeCollector(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, "")) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = ".rr-test.yaml" + + err = cont.RegisterAll( + cfg, + &metrics.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &Plugin1{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + time.Sleep(time.Second) + tt := time.NewTimer(time.Second * 5) + + out, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + assert.Contains(t, out, "my_gauge 100") + assert.Contains(t, out, "my_gauge2 100") + + out, err = get("http://localhost:2112/metrics") + assert.NoError(t, err) + assert.Contains(t, out, "go_gc_duration_seconds") + + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } +} + +func TestMetricsDifferentRPCCalls(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, "")) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = ".rr-test.yaml" + + err = cont.RegisterAll( + cfg, + &metrics.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + go func() { + tt := time.NewTimer(time.Minute * 3) + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + t.Run("DeclareMetric", declareMetricsTest) + genericOut, err := get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "test_metrics_named_collector") + + t.Run("AddMetric", addMetricsTest) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "test_metrics_named_collector 10000") + + t.Run("SetMetric", setMetric) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "user_gauge_collector 100") + + t.Run("VectorMetric", vectorMetric) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "gauge_2_collector{section=\"first\",type=\"core\"} 100") + + t.Run("MissingSection", missingSection) + t.Run("SetWithoutLabels", setWithoutLabels) + t.Run("SetOnHistogram", setOnHistogram) + t.Run("MetricSub", subMetric) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "sub_gauge_subMetric 1") + + t.Run("SubVector", subVector) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "sub_gauge_subVector{section=\"first\",type=\"core\"} 1") + + t.Run("RegisterHistogram", registerHistogram) + + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, `TYPE histogram_registerHistogram`) + + // check buckets + assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.1"} 0`) + assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.2"} 0`) + assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.5"} 0`) + assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="+Inf"} 0`) + assert.Contains(t, genericOut, `histogram_registerHistogram_sum 0`) + assert.Contains(t, genericOut, `histogram_registerHistogram_count 0`) + + t.Run("CounterMetric", counterMetric) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "HELP default_default_counter_CounterMetric test_counter") + assert.Contains(t, genericOut, `default_default_counter_CounterMetric{section="section2",type="type2"}`) + + t.Run("ObserveMetric", observeMetric) + genericOut, err = get(getAddr) + assert.NoError(t, err) + assert.Contains(t, genericOut, "observe_observeMetric") + + t.Run("ObserveMetricNotEnoughLabels", observeMetricNotEnoughLabels) + + close(sig) +} + +func observeMetricNotEnoughLabels(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "observe_observeMetricNotEnoughLabels", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Help: "test_observe", + Type: metrics.Histogram, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + assert.Error(t, client.Call("metrics.Observe", metrics.Metric{ + Name: "observe_observeMetric", + Value: 100.0, + Labels: []string{"test"}, + }, &ret)) + assert.False(t, ret) +} + +func observeMetric(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "observe_observeMetric", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Help: "test_observe", + Type: metrics.Histogram, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + assert.NoError(t, client.Call("metrics.Observe", metrics.Metric{ + Name: "observe_observeMetric", + Value: 100.0, + Labels: []string{"test", "test2"}, + }, &ret)) + assert.True(t, ret) +} + +func counterMetric(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "counter_CounterMetric", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Help: "test_counter", + Type: metrics.Counter, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + assert.NoError(t, client.Call("metrics.Add", metrics.Metric{ + Name: "counter_CounterMetric", + Value: 100.0, + Labels: []string{"type2", "section2"}, + }, &ret)) + assert.True(t, ret) +} + +func registerHistogram(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "histogram_registerHistogram", + Collector: metrics.Collector{ + Help: "test_histogram", + Type: metrics.Histogram, + Buckets: []float64{0.1, 0.2, 0.5}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + m := metrics.Metric{ + Name: "histogram_registerHistogram", + Value: 10000, + Labels: nil, + } + + err = client.Call("metrics.Add", m, &ret) + assert.Error(t, err) + assert.False(t, ret) +} + +func subVector(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "sub_gauge_subVector", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + m := metrics.Metric{ + Name: "sub_gauge_subVector", + Value: 100000, + Labels: []string{"core", "first"}, + } + + err = client.Call("metrics.Add", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + m = metrics.Metric{ + Name: "sub_gauge_subVector", + Value: 99999, + Labels: []string{"core", "first"}, + } + + err = client.Call("metrics.Sub", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} + +func subMetric(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "sub_gauge_subMetric", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + m := metrics.Metric{ + Name: "sub_gauge_subMetric", + Value: 100000, + } + + err = client.Call("metrics.Add", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + m = metrics.Metric{ + Name: "sub_gauge_subMetric", + Value: 99999, + } + + err = client.Call("metrics.Sub", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} + +func setOnHistogram(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "histogram_setOnHistogram", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Histogram, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + m := metrics.Metric{ + Name: "gauge_setOnHistogram", + Value: 100.0, + } + + err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} + assert.Error(t, err) + assert.False(t, ret) +} + +func setWithoutLabels(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "gauge_setWithoutLabels", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + m := metrics.Metric{ + Name: "gauge_setWithoutLabels", + Value: 100.0, + } + + err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} + assert.Error(t, err) + assert.False(t, ret) +} + +func missingSection(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "gauge_missing_section_collector", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + m := metrics.Metric{ + Name: "gauge_missing_section_collector", + Value: 100.0, + Labels: []string{"missing"}, + } + + err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"} + assert.Error(t, err) + assert.False(t, ret) +} + +func vectorMetric(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "gauge_2_collector", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + Labels: []string{"type", "section"}, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + + ret = false + + m := metrics.Metric{ + Name: "gauge_2_collector", + Value: 100.0, + Labels: []string{"core", "first"}, + } + + err = client.Call("metrics.Set", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} + +func setMetric(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "user_gauge_collector", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Gauge, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) + ret = false + + m := metrics.Metric{ + Name: "user_gauge_collector", + Value: 100.0, + } + + err = client.Call("metrics.Set", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} + +func addMetricsTest(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + m := metrics.Metric{ + Name: "test_metrics_named_collector", + Value: 10000, + Labels: nil, + } + + err = client.Call("metrics.Add", m, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} + +func declareMetricsTest(t *testing.T) { + conn, err := net.Dial(dialNetwork, dialAddr) + assert.NoError(t, err) + defer func() { + _ = conn.Close() + }() + client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn)) + var ret bool + + nc := metrics.NamedCollector{ + Name: "test_metrics_named_collector", + Collector: metrics.Collector{ + Namespace: "default", + Subsystem: "default", + Type: metrics.Counter, + Help: "NO HELP!", + Labels: nil, + Buckets: nil, + }, + } + + err = client.Call("metrics.Declare", nc, &ret) + assert.NoError(t, err) + assert.True(t, ret) +} diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go new file mode 100644 index 00000000..b48c415d --- /dev/null +++ b/plugins/metrics/tests/plugin1.go @@ -0,0 +1,46 @@ +package tests + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/v2/plugins/config" +) + +// Gauge ////////////// +type Plugin1 struct { + config config.Configurer +} + +func (p1 *Plugin1) Init(cfg config.Configurer) error { + p1.config = cfg + return nil +} + +func (p1 *Plugin1) Serve() chan error { + errCh := make(chan error, 1) + return errCh +} + +func (p1 *Plugin1) Stop() error { + return nil +} + +func (p1 *Plugin1) Name() string { + return "metrics_test.plugin1" +} + +func (p1 *Plugin1) MetricsCollector() []prometheus.Collector { + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge", + Help: "My gauge value", + }) + + collector.Set(100) + + collector2 := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge2", + Help: "My gauge2 value", + }) + + collector2.Set(100) + return []prometheus.Collector{collector, collector2} +} diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go index 6401c0e2..82b30563 100755 --- a/plugins/rpc/plugin.go +++ b/plugins/rpc/plugin.go @@ -8,27 +8,25 @@ import ( "github.com/spiral/endure" "github.com/spiral/errors" "github.com/spiral/goridge/v2" - "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/interfaces/log" + rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc" "github.com/spiral/roadrunner/v2/plugins/config" ) -// Pluggable declares the ability to create set of public RPC methods. -type Pluggable interface { - endure.Named - - // Provides RPC methods for the given service. - RPCService() (interface{}, error) -} - // ServiceName contains default service name. const ServiceName = "RPC" +type pluggable struct { + service rpc_.RPCer + name string +} + // Plugin is RPC service. type Plugin struct { cfg Config log log.Logger rpc *rpc.Server - services []Pluggable + services []pluggable listener net.Listener closed *uint32 } @@ -69,19 +67,13 @@ func (s *Plugin) Serve() chan error { // Attach all services for i := 0; i < len(s.services); i++ { - svc, err := s.services[i].RPCService() + err := s.Register(s.services[i].name, s.services[i].service.RPC()) if err != nil { errCh <- errors.E(op, err) return errCh } - err = s.Register(s.services[i].Name(), svc) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - services = append(services, s.services[i].Name()) + services = append(services, s.services[i].name) } var err error @@ -139,9 +131,11 @@ func (s *Plugin) Collects() []interface{} { } // RegisterPlugin registers RPC service plugin. -func (s *Plugin) RegisterPlugin(p Pluggable) error { - s.services = append(s.services, p) - return nil +func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) { + s.services = append(s.services, pluggable{ + service: p, + name: name.Name(), + }) } // Register publishes in the server the set of methods of the diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go index 788e6a2c..a8d5c216 100644 --- a/plugins/rpc/tests/plugin1.go +++ b/plugins/rpc/tests/plugin1.go @@ -28,8 +28,8 @@ func (p1 *Plugin1) Name() string { return "rpc_test.plugin1" } -func (p1 *Plugin1) RPCService() (interface{}, error) { - return &PluginRpc{srv: p1}, nil +func (p1 *Plugin1) RPC() interface{} { + return &PluginRpc{srv: p1} } type PluginRpc struct { diff --git a/plugins/app/config.go b/plugins/server/config.go index eaa54e2d..147ae0f7 100644 --- a/plugins/app/config.go +++ b/plugins/server/config.go @@ -1,6 +1,10 @@ -package app +package server -import "time" +import ( + "time" + + "github.com/spiral/roadrunner/v2/interfaces/server" +) // Config config combines factory, pool and cmd configurations. type Config struct { @@ -14,7 +18,7 @@ type Config struct { Group string // Env represents application environment. - Env Env + Env server.Env // Listen defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" diff --git a/plugins/app/plugin.go b/plugins/server/plugin.go index d76961ca..e096708a 100644 --- a/plugins/app/plugin.go +++ b/plugins/server/plugin.go @@ -1,4 +1,4 @@ -package app +package server import ( "context" @@ -9,21 +9,13 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/log" + "github.com/spiral/roadrunner/v2/interfaces/log" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/util" ) -const ServiceName = "app" - -type Env map[string]string - -// WorkerFactory creates workers for the application. -type WorkerFactory interface { - CmdFactory(env Env) (func() *exec.Cmd, error) - NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) - NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) -} +const ServiceName = "server" // Plugin manages worker type Plugin struct { @@ -71,7 +63,7 @@ func (app *Plugin) Stop() error { } // CmdFactory provides worker command factory assocated with given context. -func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { +func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) { var cmdArgs []string // create command according to the config @@ -97,7 +89,7 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) { } // NewWorker issues new standalone worker. -func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) { +func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) { const op = errors.Op("new worker") spawnCmd, err := app.CmdFactory(env) if err != nil { @@ -115,7 +107,7 @@ func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBas } // NewWorkerPool issues new worker pool. -func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) { +func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) { spawnCmd, err := app.CmdFactory(env) if err != nil { return nil, err @@ -159,7 +151,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) { } } -func (app *Plugin) setEnv(e Env) []string { +func (app *Plugin) setEnv(e server.Env) []string { env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay)) for k, v := range e { env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v)) diff --git a/plugins/app/tests/configs/.rr-no-app-section.yaml b/plugins/server/tests/configs/.rr-no-app-section.yaml index d129ae8a..b6e3ea93 100644 --- a/plugins/app/tests/configs/.rr-no-app-section.yaml +++ b/plugins/server/tests/configs/.rr-no-app-section.yaml @@ -1,4 +1,4 @@ -upp: +server: command: "php ../../../tests/client.php echo pipes" user: "" group: "" diff --git a/plugins/app/tests/configs/.rr-sockets.yaml b/plugins/server/tests/configs/.rr-sockets.yaml index 9bd62693..ab1239aa 100644 --- a/plugins/app/tests/configs/.rr-sockets.yaml +++ b/plugins/server/tests/configs/.rr-sockets.yaml @@ -1,4 +1,4 @@ -app: +server: command: "php socket.php" user: "" group: "" diff --git a/plugins/app/tests/configs/.rr-tcp.yaml b/plugins/server/tests/configs/.rr-tcp.yaml index c5a26d37..f53bffcc 100644 --- a/plugins/app/tests/configs/.rr-tcp.yaml +++ b/plugins/server/tests/configs/.rr-tcp.yaml @@ -1,4 +1,4 @@ -app: +server: command: "php tcp.php" user: "" group: "" diff --git a/plugins/app/tests/configs/.rr-wrong-command.yaml b/plugins/server/tests/configs/.rr-wrong-command.yaml index 4bd019d3..d2c087a6 100644 --- a/plugins/app/tests/configs/.rr-wrong-command.yaml +++ b/plugins/server/tests/configs/.rr-wrong-command.yaml @@ -1,4 +1,4 @@ -app: +server: command: "php some_absent_file.php" user: "" group: "" diff --git a/plugins/app/tests/configs/.rr-wrong-relay.yaml b/plugins/server/tests/configs/.rr-wrong-relay.yaml index d8ffe8f8..1dd73d73 100644 --- a/plugins/app/tests/configs/.rr-wrong-relay.yaml +++ b/plugins/server/tests/configs/.rr-wrong-relay.yaml @@ -1,4 +1,4 @@ -app: +server: command: "php ../../../tests/client.php echo pipes" user: "" group: "" diff --git a/plugins/app/tests/configs/.rr.yaml b/plugins/server/tests/configs/.rr.yaml index 221aff92..b6e3ea93 100644 --- a/plugins/app/tests/configs/.rr.yaml +++ b/plugins/server/tests/configs/.rr.yaml @@ -1,4 +1,4 @@ -app: +server: command: "php ../../../tests/client.php echo pipes" user: "" group: "" diff --git a/plugins/app/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index fc999718..840021eb 100644 --- a/plugins/app/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -6,14 +6,15 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" ) const ConfigSection = "app" const Response = "test" -var testPoolConfig = roadrunner.Config{ +var testPoolConfig = roadrunner.PoolConfig{ NumWorkers: 10, MaxJobs: 100, AllocateTimeout: time.Second * 10, @@ -29,11 +30,11 @@ var testPoolConfig = roadrunner.Config{ type Foo struct { configProvider config.Configurer - wf app.WorkerFactory + wf server.WorkerFactory pool roadrunner.Pool } -func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error { +func (f *Foo) Init(p config.Configurer, workerFactory server.WorkerFactory) error { f.configProvider = p f.wf = workerFactory return nil @@ -50,7 +51,7 @@ func (f *Foo) Serve() chan error { errCh := make(chan error, 1) - conf := &app.Config{} + conf := &plugin.Config{} var err error err = f.configProvider.UnmarshalKey(ConfigSection, conf) if err != nil { diff --git a/plugins/app/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index 585264f6..b12f4ead 100644 --- a/plugins/app/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -5,17 +5,18 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" ) type Foo2 struct { configProvider config.Configurer - wf app.WorkerFactory + wf server.WorkerFactory pool roadrunner.Pool } -func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error { +func (f *Foo2) Init(p config.Configurer, workerFactory server.WorkerFactory) error { f.configProvider = p f.wf = workerFactory return nil @@ -25,7 +26,7 @@ func (f *Foo2) Serve() chan error { const op = errors.Op("serve") var err error errCh := make(chan error, 1) - conf := &app.Config{} + conf := &plugin.Config{} // test payload for echo r := roadrunner.Payload{ diff --git a/plugins/app/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 6abc533d..39044577 100644 --- a/plugins/app/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -5,17 +5,18 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2" - "github.com/spiral/roadrunner/v2/plugins/app" + "github.com/spiral/roadrunner/v2/interfaces/server" "github.com/spiral/roadrunner/v2/plugins/config" + plugin "github.com/spiral/roadrunner/v2/plugins/server" ) type Foo3 struct { configProvider config.Configurer - wf app.WorkerFactory + wf server.WorkerFactory pool roadrunner.Pool } -func (f *Foo3) Init(p config.Configurer, workerFactory app.WorkerFactory) error { +func (f *Foo3) Init(p config.Configurer, workerFactory server.WorkerFactory) error { f.configProvider = p f.wf = workerFactory return nil @@ -25,7 +26,7 @@ func (f *Foo3) Serve() chan error { const op = errors.Op("serve") var err error errCh := make(chan error, 1) - conf := &app.Config{} + conf := &plugin.Config{} // test payload for echo r := roadrunner.Payload{ diff --git a/plugins/app/tests/app_test.go b/plugins/server/tests/server_test.go index 3c416b59..f917df5d 100644 --- a/plugins/app/tests/app_test.go +++ b/plugins/server/tests/server_test.go @@ -7,9 +7,9 @@ import ( "time" "github.com/spiral/endure" - "github.com/spiral/roadrunner/v2/plugins/app" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/server" "github.com/stretchr/testify/assert" ) @@ -27,7 +27,7 @@ func TestAppPipes(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func TestAppSockets(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestAppTCP(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -225,7 +225,7 @@ func TestAppWrongConfig(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -257,7 +257,7 @@ func TestAppWrongRelay(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -295,7 +295,7 @@ func TestAppWrongCommand(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } @@ -333,7 +333,7 @@ func TestAppNoAppSectionInConfig(t *testing.T) { t.Fatal(err) } - err = container.Register(&app.Plugin{}) + err = container.Register(&server.Plugin{}) if err != nil { t.Fatal(err) } diff --git a/plugins/app/tests/socket.php b/plugins/server/tests/socket.php index 143c3ce4..143c3ce4 100644 --- a/plugins/app/tests/socket.php +++ b/plugins/server/tests/socket.php diff --git a/plugins/app/tests/tcp.php b/plugins/server/tests/tcp.php index 2d6fb00a..2d6fb00a 100644 --- a/plugins/app/tests/tcp.php +++ b/plugins/server/tests/tcp.php |