summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'plugins')
-rw-r--r--plugins/logger/plugin.go2
-rw-r--r--plugins/logger/tests/plugin.go2
-rw-r--r--plugins/metrics/config.go135
-rw-r--r--plugins/metrics/config_test.go87
-rw-r--r--plugins/metrics/doc.go1
-rw-r--r--plugins/metrics/plugin.go230
-rw-r--r--plugins/metrics/rpc.go294
-rw-r--r--plugins/metrics/tests/.rr-test.yaml13
-rw-r--r--plugins/metrics/tests/docker-compose.yml7
-rw-r--r--plugins/metrics/tests/metrics_test.go754
-rw-r--r--plugins/metrics/tests/plugin1.go46
-rwxr-xr-xplugins/rpc/plugin.go36
-rw-r--r--plugins/rpc/tests/plugin1.go4
-rw-r--r--plugins/server/config.go (renamed from plugins/app/config.go)10
-rw-r--r--plugins/server/plugin.go (renamed from plugins/app/plugin.go)24
-rw-r--r--plugins/server/tests/configs/.rr-no-app-section.yaml (renamed from plugins/app/tests/configs/.rr-no-app-section.yaml)2
-rw-r--r--plugins/server/tests/configs/.rr-sockets.yaml (renamed from plugins/app/tests/configs/.rr-sockets.yaml)2
-rw-r--r--plugins/server/tests/configs/.rr-tcp.yaml (renamed from plugins/app/tests/configs/.rr-tcp.yaml)2
-rw-r--r--plugins/server/tests/configs/.rr-wrong-command.yaml (renamed from plugins/app/tests/configs/.rr-wrong-command.yaml)2
-rw-r--r--plugins/server/tests/configs/.rr-wrong-relay.yaml (renamed from plugins/app/tests/configs/.rr-wrong-relay.yaml)2
-rw-r--r--plugins/server/tests/configs/.rr.yaml (renamed from plugins/app/tests/configs/.rr.yaml)2
-rw-r--r--plugins/server/tests/plugin_pipes.go (renamed from plugins/app/tests/plugin_pipes.go)11
-rw-r--r--plugins/server/tests/plugin_sockets.go (renamed from plugins/app/tests/plugin_sockets.go)9
-rw-r--r--plugins/server/tests/plugin_tcp.go (renamed from plugins/app/tests/plugin_tcp.go)9
-rw-r--r--plugins/server/tests/server_test.go (renamed from plugins/app/tests/app_test.go)16
-rw-r--r--plugins/server/tests/socket.php (renamed from plugins/app/tests/socket.php)0
-rw-r--r--plugins/server/tests/tcp.php (renamed from plugins/app/tests/tcp.php)0
27 files changed, 1631 insertions, 71 deletions
diff --git a/plugins/logger/plugin.go b/plugins/logger/plugin.go
index f05d0ff0..0a8485d9 100644
--- a/plugins/logger/plugin.go
+++ b/plugins/logger/plugin.go
@@ -2,7 +2,7 @@ package logger
import (
"github.com/spiral/endure"
- "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/plugins/config"
"go.uber.org/zap"
)
diff --git a/plugins/logger/tests/plugin.go b/plugins/logger/tests/plugin.go
index 75d2736d..32238f63 100644
--- a/plugins/logger/tests/plugin.go
+++ b/plugins/logger/tests/plugin.go
@@ -2,7 +2,7 @@ package tests
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
"github.com/spiral/roadrunner/v2/plugins/config"
)
diff --git a/plugins/metrics/config.go b/plugins/metrics/config.go
new file mode 100644
index 00000000..933b7eb8
--- /dev/null
+++ b/plugins/metrics/config.go
@@ -0,0 +1,135 @@
+package metrics
+
+import (
+ "fmt"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// Config configures metrics service.
+type Config struct {
+ // Address to listen
+ Address string
+
+ // Collect define application specific metrics.
+ Collect map[string]Collector
+}
+
+type NamedCollector struct {
+ // Name of the collector
+ Name string `json:"name"`
+
+ // Collector structure
+ Collector `json:"collector"`
+}
+
+// CollectorType represents prometheus collector types
+type CollectorType string
+
+const (
+ // Histogram type
+ Histogram CollectorType = "histogram"
+
+ // Gauge type
+ Gauge CollectorType = "gauge"
+
+ // Counter type
+ Counter CollectorType = "counter"
+
+ // Summary type
+ Summary CollectorType = "summary"
+)
+
+// Collector describes single application specific metric.
+type Collector struct {
+ // Namespace of the metric.
+ Namespace string `json:"namespace"`
+ // Subsystem of the metric.
+ Subsystem string `json:"subsystem"`
+ // Collector type (histogram, gauge, counter, summary).
+ Type CollectorType `json:"type"`
+ // Help of collector.
+ Help string `json:"help"`
+ // Labels for vectorized metrics.
+ Labels []string `json:"labels"`
+ // Buckets for histogram metric.
+ Buckets []float64 `json:"buckets"`
+}
+
+// register application specific metrics.
+func (c *Config) getCollectors() (map[string]prometheus.Collector, error) {
+ if c.Collect == nil {
+ return nil, nil
+ }
+
+ collectors := make(map[string]prometheus.Collector)
+
+ for name, m := range c.Collect {
+ var collector prometheus.Collector
+ switch m.Type {
+ case Histogram:
+ opts := prometheus.HistogramOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Buckets: m.Buckets,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case Gauge:
+ opts := prometheus.GaugeOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case Counter:
+ opts := prometheus.CounterOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case Summary:
+ opts := prometheus.SummaryOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+ default:
+ return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name)
+ }
+
+ collectors[name] = collector
+ }
+
+ return collectors, nil
+}
+
+func (c *Config) InitDefaults() {
+
+}
diff --git a/plugins/metrics/config_test.go b/plugins/metrics/config_test.go
new file mode 100644
index 00000000..24c8406c
--- /dev/null
+++ b/plugins/metrics/config_test.go
@@ -0,0 +1,87 @@
+package metrics
+
+import (
+ "bytes"
+ "testing"
+
+ json "github.com/json-iterator/go"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := `{"request": {"From": "Something"}}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := `{"dir": "/dir/"`
+ c := &Config{}
+
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ assert.Error(t, err)
+}
+
+func Test_Config_Metrics(t *testing.T) {
+ cfg := `{
+"collect":{
+ "metric1":{"type": "gauge"},
+ "metric2":{ "type": "counter"},
+ "metric3":{"type": "summary"},
+ "metric4":{"type": "histogram"}
+}
+}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := c.getCollectors()
+ assert.NoError(t, err)
+
+ assert.IsType(t, prometheus.NewGauge(prometheus.GaugeOpts{}), m["metric1"])
+ assert.IsType(t, prometheus.NewCounter(prometheus.CounterOpts{}), m["metric2"])
+ assert.IsType(t, prometheus.NewSummary(prometheus.SummaryOpts{}), m["metric3"])
+ assert.IsType(t, prometheus.NewHistogram(prometheus.HistogramOpts{}), m["metric4"])
+}
+
+func Test_Config_MetricsVector(t *testing.T) {
+ cfg := `{
+"collect":{
+ "metric1":{"type": "gauge","labels":["label"]},
+ "metric2":{ "type": "counter","labels":["label"]},
+ "metric3":{"type": "summary","labels":["label"]},
+ "metric4":{"type": "histogram","labels":["label"]}
+}
+}`
+ c := &Config{}
+ f := new(bytes.Buffer)
+ f.WriteString(cfg)
+
+ err := json.Unmarshal(f.Bytes(), &c)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := c.getCollectors()
+ assert.NoError(t, err)
+
+ assert.IsType(t, prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{}), m["metric1"])
+ assert.IsType(t, prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{}), m["metric2"])
+ assert.IsType(t, prometheus.NewSummaryVec(prometheus.SummaryOpts{}, []string{}), m["metric3"])
+ assert.IsType(t, prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{}), m["metric4"])
+}
diff --git a/plugins/metrics/doc.go b/plugins/metrics/doc.go
new file mode 100644
index 00000000..1abe097a
--- /dev/null
+++ b/plugins/metrics/doc.go
@@ -0,0 +1 @@
+package metrics
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
new file mode 100644
index 00000000..3fd42ee4
--- /dev/null
+++ b/plugins/metrics/plugin.go
@@ -0,0 +1,230 @@
+package metrics
+
+import (
+ "context"
+ "crypto/tls"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/metrics"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "golang.org/x/sys/cpu"
+)
+
+const (
+ // ID declares public service name.
+ ServiceName = "metrics"
+ // maxHeaderSize declares max header size for prometheus server
+ maxHeaderSize = 1024 * 1024 * 100 // 104MB
+)
+
+type statsProvider struct {
+ collectors []prometheus.Collector
+ name string
+}
+
+// Plugin to manage application metrics using Prometheus.
+type Plugin struct {
+ cfg Config
+ log log.Logger
+ mu sync.Mutex // all receivers are pointers
+ http *http.Server
+ collectors sync.Map // all receivers are pointers
+ registry *prometheus.Registry
+}
+
+// Init service.
+func (m *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("Metrics Init")
+ err := cfg.UnmarshalKey(ServiceName, &m.cfg)
+ if err != nil {
+ return err
+ }
+
+ // TODO figure out what is Init
+ m.cfg.InitDefaults()
+
+ m.log = log
+ m.registry = prometheus.NewRegistry()
+
+ // Default
+ err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Default
+ err = m.registry.Register(prometheus.NewGoCollector())
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ collectors, err := m.cfg.getCollectors()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Register invocation will be later in the Serve method
+ for k, v := range collectors {
+ m.collectors.Store(k, statsProvider{
+ collectors: []prometheus.Collector{v},
+ name: k,
+ })
+ }
+ return nil
+}
+
+// Register new prometheus collector.
+func (m *Plugin) Register(c prometheus.Collector) error {
+ return m.registry.Register(c)
+}
+
+// Serve prometheus metrics service.
+func (m *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ m.collectors.Range(func(key, value interface{}) bool {
+ // key - name
+ // value - statsProvider struct
+ c := value.(statsProvider)
+ for _, v := range c.collectors {
+ if err := m.registry.Register(v); err != nil {
+ errCh <- err
+ return false
+ }
+ }
+
+ return true
+ })
+
+ var topCipherSuites []uint16
+ var defaultCipherSuitesTLS13 []uint16
+
+ hasGCMAsmAMD64 := cpu.X86.HasAES && cpu.X86.HasPCLMULQDQ
+ hasGCMAsmARM64 := cpu.ARM64.HasAES && cpu.ARM64.HasPMULL
+ // Keep in sync with crypto/aes/cipher_s390x.go.
+ hasGCMAsmS390X := cpu.S390X.HasAES && cpu.S390X.HasAESCBC && cpu.S390X.HasAESCTR && (cpu.S390X.HasGHASH || cpu.S390X.HasAESGCM)
+
+ hasGCMAsm := hasGCMAsmAMD64 || hasGCMAsmARM64 || hasGCMAsmS390X
+
+ if hasGCMAsm {
+ // If AES-GCM hardware is provided then prioritise AES-GCM
+ // cipher suites.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ } else {
+ // Without AES-GCM hardware, we put the ChaCha20-Poly1305
+ // cipher suites first.
+ topCipherSuites = []uint16{
+ tls.TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305,
+ tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
+ tls.TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384,
+ }
+ defaultCipherSuitesTLS13 = []uint16{
+ tls.TLS_CHACHA20_POLY1305_SHA256,
+ tls.TLS_AES_128_GCM_SHA256,
+ tls.TLS_AES_256_GCM_SHA384,
+ }
+ }
+
+ DefaultCipherSuites := make([]uint16, 0, 22)
+ DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
+ DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
+
+ m.http = &http.Server{
+ Addr: m.cfg.Address,
+ Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
+ IdleTimeout: time.Hour * 24,
+ ReadTimeout: time.Minute * 60,
+ MaxHeaderBytes: maxHeaderSize,
+ ReadHeaderTimeout: time.Minute * 60,
+ WriteTimeout: time.Minute * 60,
+ TLSConfig: &tls.Config{
+ CurvePreferences: []tls.CurveID{
+ tls.CurveP256,
+ tls.CurveP384,
+ tls.CurveP521,
+ tls.X25519,
+ },
+ CipherSuites: DefaultCipherSuites,
+ MinVersion: tls.VersionTLS12,
+ PreferServerCipherSuites: true,
+ },
+ }
+
+ go func() {
+ err := m.http.ListenAndServe()
+ if err != nil && err != http.ErrServerClosed {
+ errCh <- err
+ return
+ }
+ }()
+
+ return errCh
+}
+
+// Stop prometheus metrics service.
+func (m *Plugin) Stop() error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.http != nil {
+ // timeout is 10 seconds
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
+ defer cancel()
+ err := m.http.Shutdown(ctx)
+ if err != nil {
+ // Function should be Stop() error
+ m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ }
+ }
+ return nil
+}
+
+// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named)
+func (m *Plugin) Collects() []interface{} {
+ return []interface{}{
+ m.AddStatProvider,
+ }
+}
+
+// Collector returns application specific collector by name or nil if collector not found.
+func (m *Plugin) AddStatProvider(name endure.Named, stat metrics.StatProvider) error {
+ m.collectors.Store(name.Name(), statsProvider{
+ collectors: stat.MetricsCollector(),
+ name: name.Name(),
+ })
+ return nil
+}
+
+// RPC interface satisfaction
+func (m *Plugin) Name() string {
+ return ServiceName
+}
+
+// RPC interface satisfaction
+func (m *Plugin) RPC() interface{} {
+ return &rpcServer{
+ svc: m,
+ log: m.log,
+ }
+}
diff --git a/plugins/metrics/rpc.go b/plugins/metrics/rpc.go
new file mode 100644
index 00000000..b8897098
--- /dev/null
+++ b/plugins/metrics/rpc.go
@@ -0,0 +1,294 @@
+package metrics
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+)
+
+type rpcServer struct {
+ svc *Plugin
+ log log.Logger
+}
+
+// Metric represent single metric produced by the application.
+type Metric struct {
+ // Collector name.
+ Name string
+
+ // Collector value.
+ Value float64
+
+ // Labels associated with metric. Only for vector metrics. Must be provided in a form of label values.
+ Labels []string
+}
+
+// Add new metric to the designated collector.
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ const op = errors.Op("Add metric")
+ rpc.log.Info("Adding metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("undefined collector %s, try first Declare the desired collector", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Add(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Add(m.Value)
+ case prometheus.Counter:
+ c.Add(m.Value)
+
+ case *prometheus.CounterVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Add(m.Value)
+
+ default:
+ return errors.E(op, errors.Errorf("collector %s does not support method `Add`", m.Name))
+ }
+
+ // RPC, set ok to true as return value. Need by rpc.Call reply argument
+ *ok = true
+ rpc.log.Info("new metric successfully added", "name", m.Name, "labels", m.Labels, "value", m.Value)
+ return nil
+}
+
+// Sub subtract the value from the specific metric (gauge only).
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ const op = errors.Op("Subtracting metric")
+ rpc.log.Info("Subtracting value from metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ // can it be nil ??? I guess can't
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Sub(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector, but none was provided", "name", m.Name, "value", m.Value)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Sub(m.Value)
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Sub`", m.Name))
+ }
+ rpc.log.Info("Subtracting operation applied successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}
+
+// Observe the value (histogram and summary only).
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ const op = errors.Op("Observe metrics")
+ rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ rpc.log.Error("undefined collector", "name", m.Name, "value", m.Value, "labels", m.Labels)
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ observer, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ observer.Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return errors.E(op, errors.Errorf("required labels for collector `%s`", m.Name))
+ }
+
+ observer, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ observer.Observe(m.Value)
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method `Observe`", m.Name))
+ }
+
+ rpc.log.Info("observe operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}
+
+// Declare is used to register new collector in prometheus
+// THE TYPES ARE:
+// NamedCollector -> Collector with the name
+// bool -> RPC reply value
+// RETURNS:
+// error
+func (rpc *rpcServer) Declare(nc *NamedCollector, ok *bool) error {
+ const op = errors.Op("Declare metric")
+ rpc.log.Info("Declaring new metric", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+ _, exist := rpc.svc.collectors.Load(nc.Name)
+ if exist {
+ rpc.log.Error("metric with provided name already exist", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+ return errors.E(op, errors.Errorf("tried to register existing collector with the name `%s`", nc.Name))
+ }
+
+ var collector prometheus.Collector
+ switch nc.Type {
+ case Histogram:
+ opts := prometheus.HistogramOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ Buckets: nc.Buckets,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case Gauge:
+ opts := prometheus.GaugeOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case Counter:
+ opts := prometheus.CounterOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case Summary:
+ opts := prometheus.SummaryOpts{
+ Name: nc.Name,
+ Namespace: nc.Namespace,
+ Subsystem: nc.Subsystem,
+ Help: nc.Help,
+ }
+
+ if len(nc.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, nc.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+
+ default:
+ return errors.E(op, errors.Errorf("unknown collector type %s", nc.Type))
+ }
+
+ // add collector to sync.Map
+ rpc.svc.collectors.Store(nc.Name, collector)
+ // that method might panic, we handle it by recover
+ err := rpc.svc.Register(collector)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+
+ rpc.log.Info("metric successfully added", "name", nc.Name, "type", nc.Type, "namespace", nc.Namespace)
+
+ *ok = true
+ return nil
+}
+
+// Set the metric value (only for gaude).
+func (rpc *rpcServer) Set(m *Metric, ok *bool) (err error) {
+ const op = errors.Op("Set metric")
+ rpc.log.Info("Observing metric", "name", m.Name, "value", m.Value, "labels", m.Labels)
+
+ c, exist := rpc.svc.collectors.Load(m.Name)
+ if !exist {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+ if c == nil {
+ return errors.E(op, errors.Errorf("undefined collector %s", m.Name))
+ }
+
+ switch c := c.(type) {
+ case prometheus.Gauge:
+ c.Set(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ rpc.log.Error("required labels for collector", "collector", m.Name)
+ return errors.E(op, errors.Errorf("required labels for collector %s", m.Name))
+ }
+
+ gauge, err := c.GetMetricWithLabelValues(m.Labels...)
+ if err != nil {
+ rpc.log.Error("failed to get metrics with label values", "collector", m.Name, "labels", m.Labels)
+ return errors.E(op, err)
+ }
+ gauge.Set(m.Value)
+
+ default:
+ return errors.E(op, errors.Errorf("collector `%s` does not support method Set", m.Name))
+ }
+
+ rpc.log.Info("set operation finished successfully", "name", m.Name, "labels", m.Labels, "value", m.Value)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/metrics/tests/.rr-test.yaml b/plugins/metrics/tests/.rr-test.yaml
new file mode 100644
index 00000000..79343e3c
--- /dev/null
+++ b/plugins/metrics/tests/.rr-test.yaml
@@ -0,0 +1,13 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+metrics:
+ # prometheus client address (path /metrics added automatically)
+ address: localhost:2112
+ collect:
+ app_metric:
+ type: histogram
+ help: "Custom application metric"
+ labels: [ "type" ]
+ buckets: [ 0.1, 0.2, 0.3, 1.0 ] \ No newline at end of file
diff --git a/plugins/metrics/tests/docker-compose.yml b/plugins/metrics/tests/docker-compose.yml
new file mode 100644
index 00000000..610633b4
--- /dev/null
+++ b/plugins/metrics/tests/docker-compose.yml
@@ -0,0 +1,7 @@
+version: '3.7'
+
+services:
+ prometheus:
+ image: prom/prometheus
+ ports:
+ - 9090:9090
diff --git a/plugins/metrics/tests/metrics_test.go b/plugins/metrics/tests/metrics_test.go
new file mode 100644
index 00000000..ed5d085a
--- /dev/null
+++ b/plugins/metrics/tests/metrics_test.go
@@ -0,0 +1,754 @@
+package tests
+
+import (
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/goridge/v2"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/metrics"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/stretchr/testify/assert"
+)
+
+const dialAddr = "127.0.0.1:6001"
+const dialNetwork = "tcp"
+const getAddr = "http://localhost:2112/metrics"
+
+// get request and return body
+func get(url string) (string, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", err
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return "", err
+ }
+
+ err = r.Body.Close()
+ if err != nil {
+ return "", err
+ }
+ // unsafe
+ return string(b), err
+}
+
+func TestMetricsInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-test.yaml"
+
+ err = cont.Register(cfg)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&metrics.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&rpcPlugin.Plugin{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Register(&logger.ZapLogger{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = cont.Register(&Plugin1{})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Second * 5)
+
+ out, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "go_gc_duration_seconds")
+
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+}
+
+func TestMetricsGaugeCollector(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, ""))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-test.yaml"
+
+ err = cont.RegisterAll(
+ cfg,
+ &metrics.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &Plugin1{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ time.Sleep(time.Second)
+ tt := time.NewTimer(time.Second * 5)
+
+ out, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+ assert.Contains(t, out, "my_gauge 100")
+ assert.Contains(t, out, "my_gauge2 100")
+
+ out, err = get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+ assert.Contains(t, out, "go_gc_duration_seconds")
+
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+}
+
+func TestMetricsDifferentRPCCalls(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel), endure.Visualize(endure.StdOut, ""))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-test.yaml"
+
+ err = cont.RegisterAll(
+ cfg,
+ &metrics.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ go func() {
+ tt := time.NewTimer(time.Minute * 3)
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ t.Run("DeclareMetric", declareMetricsTest)
+ genericOut, err := get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "test_metrics_named_collector")
+
+ t.Run("AddMetric", addMetricsTest)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "test_metrics_named_collector 10000")
+
+ t.Run("SetMetric", setMetric)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "user_gauge_collector 100")
+
+ t.Run("VectorMetric", vectorMetric)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "gauge_2_collector{section=\"first\",type=\"core\"} 100")
+
+ t.Run("MissingSection", missingSection)
+ t.Run("SetWithoutLabels", setWithoutLabels)
+ t.Run("SetOnHistogram", setOnHistogram)
+ t.Run("MetricSub", subMetric)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "sub_gauge_subMetric 1")
+
+ t.Run("SubVector", subVector)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "sub_gauge_subVector{section=\"first\",type=\"core\"} 1")
+
+ t.Run("RegisterHistogram", registerHistogram)
+
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, `TYPE histogram_registerHistogram`)
+
+ // check buckets
+ assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.1"} 0`)
+ assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.2"} 0`)
+ assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="0.5"} 0`)
+ assert.Contains(t, genericOut, `histogram_registerHistogram_bucket{le="+Inf"} 0`)
+ assert.Contains(t, genericOut, `histogram_registerHistogram_sum 0`)
+ assert.Contains(t, genericOut, `histogram_registerHistogram_count 0`)
+
+ t.Run("CounterMetric", counterMetric)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "HELP default_default_counter_CounterMetric test_counter")
+ assert.Contains(t, genericOut, `default_default_counter_CounterMetric{section="section2",type="type2"}`)
+
+ t.Run("ObserveMetric", observeMetric)
+ genericOut, err = get(getAddr)
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, "observe_observeMetric")
+
+ t.Run("ObserveMetricNotEnoughLabels", observeMetricNotEnoughLabels)
+
+ close(sig)
+}
+
+func observeMetricNotEnoughLabels(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "observe_observeMetricNotEnoughLabels",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Help: "test_observe",
+ Type: metrics.Histogram,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ assert.Error(t, client.Call("metrics.Observe", metrics.Metric{
+ Name: "observe_observeMetric",
+ Value: 100.0,
+ Labels: []string{"test"},
+ }, &ret))
+ assert.False(t, ret)
+}
+
+func observeMetric(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "observe_observeMetric",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Help: "test_observe",
+ Type: metrics.Histogram,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ assert.NoError(t, client.Call("metrics.Observe", metrics.Metric{
+ Name: "observe_observeMetric",
+ Value: 100.0,
+ Labels: []string{"test", "test2"},
+ }, &ret))
+ assert.True(t, ret)
+}
+
+func counterMetric(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "counter_CounterMetric",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Help: "test_counter",
+ Type: metrics.Counter,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ assert.NoError(t, client.Call("metrics.Add", metrics.Metric{
+ Name: "counter_CounterMetric",
+ Value: 100.0,
+ Labels: []string{"type2", "section2"},
+ }, &ret))
+ assert.True(t, ret)
+}
+
+func registerHistogram(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "histogram_registerHistogram",
+ Collector: metrics.Collector{
+ Help: "test_histogram",
+ Type: metrics.Histogram,
+ Buckets: []float64{0.1, 0.2, 0.5},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ m := metrics.Metric{
+ Name: "histogram_registerHistogram",
+ Value: 10000,
+ Labels: nil,
+ }
+
+ err = client.Call("metrics.Add", m, &ret)
+ assert.Error(t, err)
+ assert.False(t, ret)
+}
+
+func subVector(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "sub_gauge_subVector",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ m := metrics.Metric{
+ Name: "sub_gauge_subVector",
+ Value: 100000,
+ Labels: []string{"core", "first"},
+ }
+
+ err = client.Call("metrics.Add", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ m = metrics.Metric{
+ Name: "sub_gauge_subVector",
+ Value: 99999,
+ Labels: []string{"core", "first"},
+ }
+
+ err = client.Call("metrics.Sub", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
+
+func subMetric(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "sub_gauge_subMetric",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ m := metrics.Metric{
+ Name: "sub_gauge_subMetric",
+ Value: 100000,
+ }
+
+ err = client.Call("metrics.Add", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ m = metrics.Metric{
+ Name: "sub_gauge_subMetric",
+ Value: 99999,
+ }
+
+ err = client.Call("metrics.Sub", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
+
+func setOnHistogram(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "histogram_setOnHistogram",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Histogram,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ m := metrics.Metric{
+ Name: "gauge_setOnHistogram",
+ Value: 100.0,
+ }
+
+ err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"}
+ assert.Error(t, err)
+ assert.False(t, ret)
+}
+
+func setWithoutLabels(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "gauge_setWithoutLabels",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ m := metrics.Metric{
+ Name: "gauge_setWithoutLabels",
+ Value: 100.0,
+ }
+
+ err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"}
+ assert.Error(t, err)
+ assert.False(t, ret)
+}
+
+func missingSection(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "gauge_missing_section_collector",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ m := metrics.Metric{
+ Name: "gauge_missing_section_collector",
+ Value: 100.0,
+ Labels: []string{"missing"},
+ }
+
+ err = client.Call("metrics.Set", m, &ret) // expected 2 label values but got 1 in []string{"missing"}
+ assert.Error(t, err)
+ assert.False(t, ret)
+}
+
+func vectorMetric(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "gauge_2_collector",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ Labels: []string{"type", "section"},
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+
+ ret = false
+
+ m := metrics.Metric{
+ Name: "gauge_2_collector",
+ Value: 100.0,
+ Labels: []string{"core", "first"},
+ }
+
+ err = client.Call("metrics.Set", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
+
+func setMetric(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "user_gauge_collector",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Gauge,
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+ ret = false
+
+ m := metrics.Metric{
+ Name: "user_gauge_collector",
+ Value: 100.0,
+ }
+
+ err = client.Call("metrics.Set", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
+
+func addMetricsTest(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ m := metrics.Metric{
+ Name: "test_metrics_named_collector",
+ Value: 10000,
+ Labels: nil,
+ }
+
+ err = client.Call("metrics.Add", m, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
+
+func declareMetricsTest(t *testing.T) {
+ conn, err := net.Dial(dialNetwork, dialAddr)
+ assert.NoError(t, err)
+ defer func() {
+ _ = conn.Close()
+ }()
+ client := rpc.NewClientWithCodec(goridge.NewClientCodec(conn))
+ var ret bool
+
+ nc := metrics.NamedCollector{
+ Name: "test_metrics_named_collector",
+ Collector: metrics.Collector{
+ Namespace: "default",
+ Subsystem: "default",
+ Type: metrics.Counter,
+ Help: "NO HELP!",
+ Labels: nil,
+ Buckets: nil,
+ },
+ }
+
+ err = client.Call("metrics.Declare", nc, &ret)
+ assert.NoError(t, err)
+ assert.True(t, ret)
+}
diff --git a/plugins/metrics/tests/plugin1.go b/plugins/metrics/tests/plugin1.go
new file mode 100644
index 00000000..b48c415d
--- /dev/null
+++ b/plugins/metrics/tests/plugin1.go
@@ -0,0 +1,46 @@
+package tests
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+// Gauge //////////////
+type Plugin1 struct {
+ config config.Configurer
+}
+
+func (p1 *Plugin1) Init(cfg config.Configurer) error {
+ p1.config = cfg
+ return nil
+}
+
+func (p1 *Plugin1) Serve() chan error {
+ errCh := make(chan error, 1)
+ return errCh
+}
+
+func (p1 *Plugin1) Stop() error {
+ return nil
+}
+
+func (p1 *Plugin1) Name() string {
+ return "metrics_test.plugin1"
+}
+
+func (p1 *Plugin1) MetricsCollector() []prometheus.Collector {
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge",
+ Help: "My gauge value",
+ })
+
+ collector.Set(100)
+
+ collector2 := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge2",
+ Help: "My gauge2 value",
+ })
+
+ collector2.Set(100)
+ return []prometheus.Collector{collector, collector2}
+}
diff --git a/plugins/rpc/plugin.go b/plugins/rpc/plugin.go
index 6401c0e2..82b30563 100755
--- a/plugins/rpc/plugin.go
+++ b/plugins/rpc/plugin.go
@@ -8,27 +8,25 @@ import (
"github.com/spiral/endure"
"github.com/spiral/errors"
"github.com/spiral/goridge/v2"
- "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ rpc_ "github.com/spiral/roadrunner/v2/interfaces/rpc"
"github.com/spiral/roadrunner/v2/plugins/config"
)
-// Pluggable declares the ability to create set of public RPC methods.
-type Pluggable interface {
- endure.Named
-
- // Provides RPC methods for the given service.
- RPCService() (interface{}, error)
-}
-
// ServiceName contains default service name.
const ServiceName = "RPC"
+type pluggable struct {
+ service rpc_.RPCer
+ name string
+}
+
// Plugin is RPC service.
type Plugin struct {
cfg Config
log log.Logger
rpc *rpc.Server
- services []Pluggable
+ services []pluggable
listener net.Listener
closed *uint32
}
@@ -69,19 +67,13 @@ func (s *Plugin) Serve() chan error {
// Attach all services
for i := 0; i < len(s.services); i++ {
- svc, err := s.services[i].RPCService()
+ err := s.Register(s.services[i].name, s.services[i].service.RPC())
if err != nil {
errCh <- errors.E(op, err)
return errCh
}
- err = s.Register(s.services[i].Name(), svc)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- services = append(services, s.services[i].Name())
+ services = append(services, s.services[i].name)
}
var err error
@@ -139,9 +131,11 @@ func (s *Plugin) Collects() []interface{} {
}
// RegisterPlugin registers RPC service plugin.
-func (s *Plugin) RegisterPlugin(p Pluggable) error {
- s.services = append(s.services, p)
- return nil
+func (s *Plugin) RegisterPlugin(name endure.Named, p rpc_.RPCer) {
+ s.services = append(s.services, pluggable{
+ service: p,
+ name: name.Name(),
+ })
}
// Register publishes in the server the set of methods of the
diff --git a/plugins/rpc/tests/plugin1.go b/plugins/rpc/tests/plugin1.go
index 788e6a2c..a8d5c216 100644
--- a/plugins/rpc/tests/plugin1.go
+++ b/plugins/rpc/tests/plugin1.go
@@ -28,8 +28,8 @@ func (p1 *Plugin1) Name() string {
return "rpc_test.plugin1"
}
-func (p1 *Plugin1) RPCService() (interface{}, error) {
- return &PluginRpc{srv: p1}, nil
+func (p1 *Plugin1) RPC() interface{} {
+ return &PluginRpc{srv: p1}
}
type PluginRpc struct {
diff --git a/plugins/app/config.go b/plugins/server/config.go
index eaa54e2d..147ae0f7 100644
--- a/plugins/app/config.go
+++ b/plugins/server/config.go
@@ -1,6 +1,10 @@
-package app
+package server
-import "time"
+import (
+ "time"
+
+ "github.com/spiral/roadrunner/v2/interfaces/server"
+)
// Config config combines factory, pool and cmd configurations.
type Config struct {
@@ -14,7 +18,7 @@ type Config struct {
Group string
// Env represents application environment.
- Env Env
+ Env server.Env
// Listen defines connection method and factory to be used to connect to workers:
// "pipes", "tcp://:6001", "unix://rr.sock"
diff --git a/plugins/app/plugin.go b/plugins/server/plugin.go
index d76961ca..e096708a 100644
--- a/plugins/app/plugin.go
+++ b/plugins/server/plugin.go
@@ -1,4 +1,4 @@
-package app
+package server
import (
"context"
@@ -9,21 +9,13 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/log"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/util"
)
-const ServiceName = "app"
-
-type Env map[string]string
-
-// WorkerFactory creates workers for the application.
-type WorkerFactory interface {
- CmdFactory(env Env) (func() *exec.Cmd, error)
- NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error)
- NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error)
-}
+const ServiceName = "server"
// Plugin manages worker
type Plugin struct {
@@ -71,7 +63,7 @@ func (app *Plugin) Stop() error {
}
// CmdFactory provides worker command factory assocated with given context.
-func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
+func (app *Plugin) CmdFactory(env server.Env) (func() *exec.Cmd, error) {
var cmdArgs []string
// create command according to the config
@@ -97,7 +89,7 @@ func (app *Plugin) CmdFactory(env Env) (func() *exec.Cmd, error) {
}
// NewWorker issues new standalone worker.
-func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBase, error) {
+func (app *Plugin) NewWorker(ctx context.Context, env server.Env) (roadrunner.WorkerBase, error) {
const op = errors.Op("new worker")
spawnCmd, err := app.CmdFactory(env)
if err != nil {
@@ -115,7 +107,7 @@ func (app *Plugin) NewWorker(ctx context.Context, env Env) (roadrunner.WorkerBas
}
// NewWorkerPool issues new worker pool.
-func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.Config, env Env) (roadrunner.Pool, error) {
+func (app *Plugin) NewWorkerPool(ctx context.Context, opt roadrunner.PoolConfig, env server.Env) (roadrunner.Pool, error) {
spawnCmd, err := app.CmdFactory(env)
if err != nil {
return nil, err
@@ -159,7 +151,7 @@ func (app *Plugin) initFactory() (roadrunner.Factory, error) {
}
}
-func (app *Plugin) setEnv(e Env) []string {
+func (app *Plugin) setEnv(e server.Env) []string {
env := append(os.Environ(), fmt.Sprintf("RR_RELAY=%s", app.cfg.Relay))
for k, v := range e {
env = append(env, fmt.Sprintf("%s=%s", strings.ToUpper(k), v))
diff --git a/plugins/app/tests/configs/.rr-no-app-section.yaml b/plugins/server/tests/configs/.rr-no-app-section.yaml
index d129ae8a..b6e3ea93 100644
--- a/plugins/app/tests/configs/.rr-no-app-section.yaml
+++ b/plugins/server/tests/configs/.rr-no-app-section.yaml
@@ -1,4 +1,4 @@
-upp:
+server:
command: "php ../../../tests/client.php echo pipes"
user: ""
group: ""
diff --git a/plugins/app/tests/configs/.rr-sockets.yaml b/plugins/server/tests/configs/.rr-sockets.yaml
index 9bd62693..ab1239aa 100644
--- a/plugins/app/tests/configs/.rr-sockets.yaml
+++ b/plugins/server/tests/configs/.rr-sockets.yaml
@@ -1,4 +1,4 @@
-app:
+server:
command: "php socket.php"
user: ""
group: ""
diff --git a/plugins/app/tests/configs/.rr-tcp.yaml b/plugins/server/tests/configs/.rr-tcp.yaml
index c5a26d37..f53bffcc 100644
--- a/plugins/app/tests/configs/.rr-tcp.yaml
+++ b/plugins/server/tests/configs/.rr-tcp.yaml
@@ -1,4 +1,4 @@
-app:
+server:
command: "php tcp.php"
user: ""
group: ""
diff --git a/plugins/app/tests/configs/.rr-wrong-command.yaml b/plugins/server/tests/configs/.rr-wrong-command.yaml
index 4bd019d3..d2c087a6 100644
--- a/plugins/app/tests/configs/.rr-wrong-command.yaml
+++ b/plugins/server/tests/configs/.rr-wrong-command.yaml
@@ -1,4 +1,4 @@
-app:
+server:
command: "php some_absent_file.php"
user: ""
group: ""
diff --git a/plugins/app/tests/configs/.rr-wrong-relay.yaml b/plugins/server/tests/configs/.rr-wrong-relay.yaml
index d8ffe8f8..1dd73d73 100644
--- a/plugins/app/tests/configs/.rr-wrong-relay.yaml
+++ b/plugins/server/tests/configs/.rr-wrong-relay.yaml
@@ -1,4 +1,4 @@
-app:
+server:
command: "php ../../../tests/client.php echo pipes"
user: ""
group: ""
diff --git a/plugins/app/tests/configs/.rr.yaml b/plugins/server/tests/configs/.rr.yaml
index 221aff92..b6e3ea93 100644
--- a/plugins/app/tests/configs/.rr.yaml
+++ b/plugins/server/tests/configs/.rr.yaml
@@ -1,4 +1,4 @@
-app:
+server:
command: "php ../../../tests/client.php echo pipes"
user: ""
group: ""
diff --git a/plugins/app/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index fc999718..840021eb 100644
--- a/plugins/app/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -6,14 +6,15 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
+ plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
const ConfigSection = "app"
const Response = "test"
-var testPoolConfig = roadrunner.Config{
+var testPoolConfig = roadrunner.PoolConfig{
NumWorkers: 10,
MaxJobs: 100,
AllocateTimeout: time.Second * 10,
@@ -29,11 +30,11 @@ var testPoolConfig = roadrunner.Config{
type Foo struct {
configProvider config.Configurer
- wf app.WorkerFactory
+ wf server.WorkerFactory
pool roadrunner.Pool
}
-func (f *Foo) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+func (f *Foo) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -50,7 +51,7 @@ func (f *Foo) Serve() chan error {
errCh := make(chan error, 1)
- conf := &app.Config{}
+ conf := &plugin.Config{}
var err error
err = f.configProvider.UnmarshalKey(ConfigSection, conf)
if err != nil {
diff --git a/plugins/app/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index 585264f6..b12f4ead 100644
--- a/plugins/app/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -5,17 +5,18 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
+ plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
type Foo2 struct {
configProvider config.Configurer
- wf app.WorkerFactory
+ wf server.WorkerFactory
pool roadrunner.Pool
}
-func (f *Foo2) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+func (f *Foo2) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -25,7 +26,7 @@ func (f *Foo2) Serve() chan error {
const op = errors.Op("serve")
var err error
errCh := make(chan error, 1)
- conf := &app.Config{}
+ conf := &plugin.Config{}
// test payload for echo
r := roadrunner.Payload{
diff --git a/plugins/app/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 6abc533d..39044577 100644
--- a/plugins/app/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -5,17 +5,18 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2"
- "github.com/spiral/roadrunner/v2/plugins/app"
+ "github.com/spiral/roadrunner/v2/interfaces/server"
"github.com/spiral/roadrunner/v2/plugins/config"
+ plugin "github.com/spiral/roadrunner/v2/plugins/server"
)
type Foo3 struct {
configProvider config.Configurer
- wf app.WorkerFactory
+ wf server.WorkerFactory
pool roadrunner.Pool
}
-func (f *Foo3) Init(p config.Configurer, workerFactory app.WorkerFactory) error {
+func (f *Foo3) Init(p config.Configurer, workerFactory server.WorkerFactory) error {
f.configProvider = p
f.wf = workerFactory
return nil
@@ -25,7 +26,7 @@ func (f *Foo3) Serve() chan error {
const op = errors.Op("serve")
var err error
errCh := make(chan error, 1)
- conf := &app.Config{}
+ conf := &plugin.Config{}
// test payload for echo
r := roadrunner.Payload{
diff --git a/plugins/app/tests/app_test.go b/plugins/server/tests/server_test.go
index 3c416b59..f917df5d 100644
--- a/plugins/app/tests/app_test.go
+++ b/plugins/server/tests/server_test.go
@@ -7,9 +7,9 @@ import (
"time"
"github.com/spiral/endure"
- "github.com/spiral/roadrunner/v2/plugins/app"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
"github.com/stretchr/testify/assert"
)
@@ -27,7 +27,7 @@ func TestAppPipes(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -93,7 +93,7 @@ func TestAppSockets(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -159,7 +159,7 @@ func TestAppTCP(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -225,7 +225,7 @@ func TestAppWrongConfig(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -257,7 +257,7 @@ func TestAppWrongRelay(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -295,7 +295,7 @@ func TestAppWrongCommand(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
@@ -333,7 +333,7 @@ func TestAppNoAppSectionInConfig(t *testing.T) {
t.Fatal(err)
}
- err = container.Register(&app.Plugin{})
+ err = container.Register(&server.Plugin{})
if err != nil {
t.Fatal(err)
}
diff --git a/plugins/app/tests/socket.php b/plugins/server/tests/socket.php
index 143c3ce4..143c3ce4 100644
--- a/plugins/app/tests/socket.php
+++ b/plugins/server/tests/socket.php
diff --git a/plugins/app/tests/tcp.php b/plugins/server/tests/tcp.php
index 2d6fb00a..2d6fb00a 100644
--- a/plugins/app/tests/tcp.php
+++ b/plugins/server/tests/tcp.php