summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Titov <[email protected]>2019-06-27 13:00:34 +0300
committerGitHub <[email protected]>2019-06-27 13:00:34 +0300
commitce7f331a5e6c6129331e06224865abdb1298584d (patch)
tree4fdf5b9158cf481ee77c408ea8d57d83a7e692f3
parent5b6e0a535fef745594a4966c724509dcda05b422 (diff)
parent1633d128309765536e7cbb176225926efda7a33c (diff)
Merge pull request #168 from spiral/feature/metrics
Feature/metrics
-rw-r--r--.travis.yml4
-rw-r--r--Makefile1
-rw-r--r--bin/rr (renamed from src/bin/rr)6
-rw-r--r--cmd/rr/http/metrics.go123
-rw-r--r--cmd/rr/limit/metrics.go63
-rw-r--r--cmd/rr/main.go4
-rw-r--r--composer.json2
-rw-r--r--go.mod5
-rw-r--r--service/metrics/config.go119
-rw-r--r--service/metrics/config_test.go27
-rw-r--r--service/metrics/rpc.go156
-rw-r--r--service/metrics/rpc_test.go1
-rw-r--r--service/metrics/service.go91
-rw-r--r--service/metrics/service_test.go190
-rw-r--r--src/Exception/MetricException.php17
-rw-r--r--src/HttpClient.php2
-rw-r--r--src/Metrics.php103
-rw-r--r--src/Worker.php2
18 files changed, 906 insertions, 10 deletions
diff --git a/.travis.yml b/.travis.yml
index fb1873e3..9ae4200a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -13,7 +13,7 @@ install:
- php composer-setup.php
- php composer.phar install --no-interaction --prefer-source
- find src/ -name "*.php" -print0 | xargs -0 -n1 -P8 php -l
- - chmod +x src/bin/rr && src/bin/rr get-binary
+ - chmod +x bin/rr && bin/rr get-binary
script:
- go test -race -v -coverprofile=lib.txt -covermode=atomic
@@ -25,6 +25,7 @@ script:
- go test ./service/static -race -v -coverprofile=static.txt -covermode=atomic
- go test ./service/limit -race -v -coverprofile=limit.txt -covermode=atomic
- go test ./service/headers -race -v -coverprofile=headers.txt -covermode=atomic
+ - go test ./service/metrics -race -v -coverprofile=metrics.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash) -f lib.txt
@@ -36,6 +37,7 @@ after_success:
- bash <(curl -s https://codecov.io/bash) -f static.txt
- bash <(curl -s https://codecov.io/bash) -f limit.txt
- bash <(curl -s https://codecov.io/bash) -f headers.txt
+ - bash <(curl -s https://codecov.io/bash) -f metrics.txt
jobs:
include:
diff --git a/Makefile b/Makefile
index c8327f98..82d22bec 100644
--- a/Makefile
+++ b/Makefile
@@ -19,3 +19,4 @@ test:
go test -v -race -cover ./service/static
go test -v -race -cover ./service/limit
go test -v -race -cover ./service/headers
+ go test -v -race -cover ./service/metrics
diff --git a/src/bin/rr b/bin/rr
index 2b392467..71670ff4 100644
--- a/src/bin/rr
+++ b/bin/rr
@@ -9,8 +9,8 @@
declare(strict_types=1);
foreach ([
- __DIR__ . '/../../../../autoload.php',
- __DIR__ . '/../../vendor/autoload.php',
+ __DIR__ . '/../../../autoload.php',
+ __DIR__ . '/../vendor/autoload.php',
__DIR__ . '/vendor/autoload.php'
] as $file) {
if (file_exists($file)) {
@@ -62,7 +62,7 @@ class RRHelper
*/
public static function getVersion(): string
{
- $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh';
+ $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh';
$fileResource = fopen($file, 'r') or die(1);
while (!feof($fileResource)) {
$line = fgets($fileResource, 4096);
diff --git a/cmd/rr/http/metrics.go b/cmd/rr/http/metrics.go
new file mode 100644
index 00000000..21bbbaf1
--- /dev/null
+++ b/cmd/rr/http/metrics.go
@@ -0,0 +1,123 @@
+package http
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spf13/cobra"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ rrhttp "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner/service/metrics"
+ "github.com/spiral/roadrunner/util"
+ "strconv"
+ "time"
+)
+
+func init() {
+ cobra.OnInitialize(func() {
+ svc, _ := rr.Container.Get(metrics.ID)
+ mtr, ok := svc.(*metrics.Service)
+ if !ok || !mtr.Enabled() {
+ return
+ }
+
+ ht, _ := rr.Container.Get(rrhttp.ID)
+ if ht, ok := ht.(*rrhttp.Service); ok {
+ collector := newCollector()
+
+ // register metrics
+ mtr.MustRegister(collector.requestCounter)
+ mtr.MustRegister(collector.requestDuration)
+ mtr.MustRegister(collector.workersMemory)
+
+ // collect events
+ ht.AddListener(collector.listener)
+
+ // update memory usage every 10 seconds
+ go collector.collectMemory(ht, time.Second*10)
+ }
+ })
+}
+
+// listener provide debug callback for system events. With colors!
+type metricCollector struct {
+ requestCounter *prometheus.CounterVec
+ requestDuration *prometheus.HistogramVec
+ workersMemory prometheus.Gauge
+}
+
+func newCollector() *metricCollector {
+ return &metricCollector{
+ requestCounter: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "rr_http_request_total",
+ Help: "Total number of handled http requests after server restart.",
+ },
+ []string{"status"},
+ ),
+ requestDuration: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "rr_http_request_duration_seconds",
+ Help: "HTTP request duration.",
+ },
+ []string{"status"},
+ ),
+ workersMemory: prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: "rr_http_workers_memory_bytes",
+ Help: "Memory usage by HTTP workers.",
+ },
+ ),
+ }
+}
+
+// listener listens to http events and generates nice looking output.
+func (c *metricCollector) listener(event int, ctx interface{}) {
+ // http events
+ switch event {
+ case rrhttp.EventResponse:
+ e := ctx.(*rrhttp.ResponseEvent)
+
+ c.requestCounter.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Inc()
+
+ c.requestDuration.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Observe(e.Elapsed().Seconds())
+
+ case rrhttp.EventError:
+ e := ctx.(*rrhttp.ErrorEvent)
+
+ c.requestCounter.With(prometheus.Labels{
+ "status": "500",
+ }).Inc()
+
+ c.requestDuration.With(prometheus.Labels{
+ "status": "500",
+ }).Observe(e.Elapsed().Seconds())
+ }
+}
+
+// collect memory usage by server workers
+func (c *metricCollector) collectMemory(service *rrhttp.Service, tick time.Duration) {
+ started := false
+ for {
+ server := service.Server()
+ if server == nil && started {
+ // stopped
+ return
+ }
+
+ started = true
+
+ if workers, err := util.ServerState(server); err == nil {
+ sum := 0.0
+ for _, w := range workers {
+ sum = sum + float64(w.MemoryUsage)
+ }
+
+ c.workersMemory.Set(sum)
+ }
+
+ time.Sleep(tick)
+ }
+}
diff --git a/cmd/rr/limit/metrics.go b/cmd/rr/limit/metrics.go
new file mode 100644
index 00000000..947f53fe
--- /dev/null
+++ b/cmd/rr/limit/metrics.go
@@ -0,0 +1,63 @@
+package limit
+
+import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spf13/cobra"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ rrlimit "github.com/spiral/roadrunner/service/limit"
+ "github.com/spiral/roadrunner/service/metrics"
+)
+
+func init() {
+ cobra.OnInitialize(func() {
+ svc, _ := rr.Container.Get(metrics.ID)
+ mtr, ok := svc.(*metrics.Service)
+ if !ok || !mtr.Enabled() {
+ return
+ }
+
+ ht, _ := rr.Container.Get(rrlimit.ID)
+ if ht, ok := ht.(*rrlimit.Service); ok {
+ collector := newCollector()
+
+ // register metrics
+ mtr.MustRegister(collector.maxMemory)
+
+ // collect events
+ ht.AddListener(collector.listener)
+ }
+ })
+}
+
+// listener provide debug callback for system events. With colors!
+type metricCollector struct {
+ maxMemory prometheus.Counter
+ maxExecutionTime prometheus.Counter
+}
+
+func newCollector() *metricCollector {
+ return &metricCollector{
+ maxMemory: prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "rr_limit_max_memory",
+ Help: "Total number of workers that was killed because they reached max memory limit.",
+ },
+ ),
+ maxExecutionTime: prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Name: "rr_limit_max_execution_time",
+ Help: "Total number of workers that was killed because they reached max execution time limit.",
+ },
+ ),
+ }
+}
+
+// listener listens to http events and generates nice looking output.
+func (c *metricCollector) listener(event int, ctx interface{}) {
+ switch event {
+ case rrlimit.EventMaxMemory:
+ c.maxMemory.Inc()
+ case rrlimit.EventExecTTL:
+ c.maxExecutionTime.Inc()
+ }
+}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 6f325fb4..6fb10ba6 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -24,12 +24,13 @@ package main
import (
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/service/headers"
// services (plugins)
"github.com/spiral/roadrunner/service/env"
+ "github.com/spiral/roadrunner/service/headers"
"github.com/spiral/roadrunner/service/http"
"github.com/spiral/roadrunner/service/limit"
+ "github.com/spiral/roadrunner/service/metrics"
"github.com/spiral/roadrunner/service/rpc"
"github.com/spiral/roadrunner/service/static"
@@ -42,6 +43,7 @@ func main() {
rr.Container.Register(env.ID, &env.Service{})
rr.Container.Register(rpc.ID, &rpc.Service{})
rr.Container.Register(http.ID, &http.Service{})
+ rr.Container.Register(metrics.ID, &metrics.Service{})
rr.Container.Register(headers.ID, &headers.Service{})
rr.Container.Register(static.ID, &static.Service{})
rr.Container.Register(limit.ID, &limit.Service{})
diff --git a/composer.json b/composer.json
index f71236b1..fd50fa16 100644
--- a/composer.json
+++ b/composer.json
@@ -27,6 +27,6 @@
}
},
"bin": [
- "src/bin/rr"
+ "bin/rr"
]
}
diff --git a/go.mod b/go.mod
index 8ec6a1fb..104239f4 100644
--- a/go.mod
+++ b/go.mod
@@ -13,15 +13,16 @@ require (
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b
github.com/olekukonko/tablewriter v0.0.1
github.com/pkg/errors v0.8.1
+ github.com/prometheus/client_golang v1.0.0
github.com/shirou/gopsutil v2.17.12+incompatible
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/spiral/goridge v2.1.3+incompatible
- github.com/stretchr/testify v1.2.2
+ github.com/stretchr/testify v1.3.0
github.com/yookoala/gofast v0.3.0
- golang.org/x/net v0.0.0-20181017193950-04a2e542c03f
+ golang.org/x/net v0.0.0-20181114220301-adae6a3d119a
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)
diff --git a/service/metrics/config.go b/service/metrics/config.go
new file mode 100644
index 00000000..b9b21ea9
--- /dev/null
+++ b/service/metrics/config.go
@@ -0,0 +1,119 @@
+package metrics
+
+import (
+ "fmt"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/spiral/roadrunner/service"
+)
+
+type Config struct {
+ // Address to listen
+ Address string
+
+ // Collect define application specific metrics.
+ Collect map[string]Collector
+}
+
+// Collector describes single application specific metric.
+type Collector struct {
+ // Namespace of the metric.
+ Namespace string
+
+ // Subsystem of the metric.
+ Subsystem string
+
+ // Collector type (histogram, gauge, counter, summary).
+ Type string
+
+ // Help of collector.
+ Help string
+
+ // Labels for vectorized metrics.
+ Labels []string
+
+ // Buckets for histogram metric.
+ Buckets []float64
+}
+
+// Hydrate configuration.
+func (c *Config) Hydrate(cfg service.Config) error {
+ return cfg.Unmarshal(c)
+}
+
+// register application specific metrics.
+func (c *Config) initCollectors() (map[string]prometheus.Collector, error) {
+ if c.Collect == nil {
+ return nil, nil
+ }
+
+ collectors := make(map[string]prometheus.Collector)
+
+ for name, m := range c.Collect {
+ var collector prometheus.Collector
+ switch m.Type {
+ case "histogram":
+ opts := prometheus.HistogramOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ Buckets: m.Buckets,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewHistogramVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewHistogram(opts)
+ }
+ case "gauge":
+ opts := prometheus.GaugeOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewGaugeVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewGauge(opts)
+ }
+ case "counter":
+ opts := prometheus.CounterOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewCounterVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewCounter(opts)
+ }
+ case "summary":
+ opts := prometheus.SummaryOpts{
+ Name: name,
+ Namespace: m.Namespace,
+ Subsystem: m.Subsystem,
+ Help: m.Help,
+ }
+
+ if len(m.Labels) != 0 {
+ collector = prometheus.NewSummaryVec(opts, m.Labels)
+ } else {
+ collector = prometheus.NewSummary(opts)
+ }
+ default:
+ return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name)
+ }
+
+ if err := prometheus.Register(collector); err != nil {
+ return nil, err
+ }
+
+ collectors[name] = collector
+ }
+
+ return collectors, nil
+}
diff --git a/service/metrics/config_test.go b/service/metrics/config_test.go
new file mode 100644
index 00000000..bd02d1cf
--- /dev/null
+++ b/service/metrics/config_test.go
@@ -0,0 +1,27 @@
+package metrics
+
+import (
+ "encoding/json"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := &mockCfg{`{"request": {"From": "Something"}}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go
new file mode 100644
index 00000000..30ad6c62
--- /dev/null
+++ b/service/metrics/rpc.go
@@ -0,0 +1,156 @@
+package metrics
+
+import (
+ "fmt"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type rpcServer struct{ svc *Service }
+
+// Metric represent single metric produced by the application.
+type Metric struct {
+ // Collector name.
+ Name string
+
+ // Collector value.
+ Value float64
+
+ // Labels associated with metric. Only for vector metrics.
+ Labels []string
+}
+
+// Add new metric to the designated collector.
+func (rpc *rpcServer) Add(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Add(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value)
+
+ case prometheus.Counter:
+ c.(prometheus.Counter).Add(m.Value)
+
+ case *prometheus.CounterVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value)
+
+ case prometheus.Summary:
+ c.(prometheus.Counter).Add(m.Value)
+
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.(prometheus.Histogram).Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Sub subtract the value from the specific metric (gauge only).
+func (rpc *rpcServer) Sub(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Sub(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value)
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Observe the value (histogram and summary only).
+func (rpc *rpcServer) Observe(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case *prometheus.SummaryVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value)
+
+ case prometheus.Histogram:
+ c.(prometheus.Histogram).Observe(m.Value)
+
+ case *prometheus.HistogramVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value)
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
+
+// Set the metric value (only for gaude).
+func (rpc *rpcServer) Set(m *Metric, ok *bool) error {
+ c := rpc.svc.Collector(m.Name)
+ if c == nil {
+ return fmt.Errorf("undefined collector `%s`", m.Name)
+ }
+
+ switch c.(type) {
+ case prometheus.Gauge:
+ c.(prometheus.Gauge).Set(m.Value)
+
+ case *prometheus.GaugeVec:
+ if len(m.Labels) == 0 {
+ return fmt.Errorf("required labels for collector `%s`", m.Name)
+ }
+
+ c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value)
+
+ default:
+ return fmt.Errorf("collector `%s` does not support method `Set`", m.Name)
+ }
+
+ *ok = true
+ return nil
+}
diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go
new file mode 100644
index 00000000..1abe097a
--- /dev/null
+++ b/service/metrics/rpc_test.go
@@ -0,0 +1 @@
+package metrics
diff --git a/service/metrics/service.go b/service/metrics/service.go
new file mode 100644
index 00000000..2c94568d
--- /dev/null
+++ b/service/metrics/service.go
@@ -0,0 +1,91 @@
+package metrics
+
+import (
+ "context"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/spiral/roadrunner/service/rpc"
+ "net/http"
+ "sync"
+)
+
+// ID declares public service name.
+const ID = "metrics"
+
+// Service to manage application metrics using Prometheus.
+type Service struct {
+ cfg *Config
+ mu sync.Mutex
+ http *http.Server
+ collectors sync.Map
+}
+
+// Init service.
+func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) {
+ s.cfg = cfg
+
+ if r != nil {
+ if err := r.Register(ID, &rpcServer{s}); err != nil {
+ return false, err
+ }
+ }
+
+ return true, nil
+}
+
+// Enabled indicates that server is able to collect metrics.
+func (s *Service) Enabled() bool {
+ return s.cfg != nil
+}
+
+// Register new prometheus collector.
+func (s *Service) Register(c prometheus.Collector) error {
+ return prometheus.Register(c)
+}
+
+// MustRegister registers new collector or fails with panic.
+func (s *Service) MustRegister(c prometheus.Collector) {
+ if err := prometheus.Register(c); err != nil {
+ panic(err)
+ }
+}
+
+// Serve prometheus metrics service.
+func (s *Service) Serve() error {
+ // register application specific metrics
+ collectors, err := s.cfg.initCollectors()
+ if err != nil {
+ return err
+ }
+
+ for name, collector := range collectors {
+ s.collectors.Store(name, collector)
+ }
+
+ s.mu.Lock()
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: promhttp.Handler()}
+ s.mu.Unlock()
+
+ return s.http.ListenAndServe()
+}
+
+// Stop prometheus metrics service.
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.http != nil {
+ // gracefully stop server
+ go s.http.Shutdown(context.Background())
+ }
+}
+
+// Collector returns application specific collector by name or nil if collector not found.
+func (s *Service) Collector(name string) prometheus.Collector {
+ collector, ok := s.collectors.Load(name)
+ if !ok {
+ return nil
+ }
+
+ return collector.(prometheus.Collector)
+}
diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go
new file mode 100644
index 00000000..513b3042
--- /dev/null
+++ b/service/metrics/service_test.go
@@ -0,0 +1,190 @@
+package metrics
+
+import (
+ "encoding/json"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+ "io/ioutil"
+ "net/http"
+ "testing"
+ "time"
+)
+
+type testCfg struct {
+ metricsCfg string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.metricsCfg}
+ }
+
+ return nil
+}
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ err := json.Unmarshal([]byte(cfg.target), out)
+ return err
+}
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ return string(b), r, err
+}
+
+func TestService_Serve(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "go_gc_duration_seconds")
+}
+
+func Test_ServiceCustomMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge",
+ Help: "My gauge value",
+ })
+
+ assert.NoError(t, s.(*Service).Register(collector))
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ collector.Set(100)
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "my_gauge 100")
+}
+
+func Test_ServiceCustomMetricMust(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112"
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ collector := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: "my_gauge_2",
+ Help: "My gauge value",
+ })
+
+ s.(*Service).MustRegister(collector)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ collector.Set(100)
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "my_gauge_2 100")
+}
+
+func Test_ConfiguredMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112",
+ "collect":{
+ "user_gauge":{
+ "type": "gauge"
+ }
+ }
+
+ }`}))
+
+ s, _ := c.Get(ID)
+ assert.NotNil(t, s)
+
+ assert.True(t, s.(*Service).Enabled())
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 100)
+ defer c.Stop()
+
+ s.(*Service).Collector("user_gauge").(prometheus.Gauge).Set(100)
+
+ assert.Nil(t, s.(*Service).Collector("invalid"))
+
+ out, _, err := get("http://localhost:2112/metrics")
+ assert.NoError(t, err)
+
+ assert.Contains(t, out, "user_gauge 100")
+}
+
+func Test_ConfiguredInvalidMetric(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{metricsCfg: `{
+ "address": "localhost:2112",
+ "collect":{
+ "user_gauge":{
+ "type": "invalid"
+ }
+ }
+
+ }`}))
+
+ assert.Error(t, c.Serve())
+}
diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php
new file mode 100644
index 00000000..2d12eefe
--- /dev/null
+++ b/src/Exception/MetricException.php
@@ -0,0 +1,17 @@
+<?php
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Exception;
+
+use Spiral\Goridge\Exceptions\RPCException;
+
+class MetricException extends RPCException
+{
+
+} \ No newline at end of file
diff --git a/src/HttpClient.php b/src/HttpClient.php
index f31a9b50..d2862ad8 100644
--- a/src/HttpClient.php
+++ b/src/HttpClient.php
@@ -8,7 +8,7 @@ declare(strict_types=1);
namespace Spiral\RoadRunner;
-class HttpClient
+final class HttpClient
{
/** @var Worker */
private $worker;
diff --git a/src/Metrics.php b/src/Metrics.php
new file mode 100644
index 00000000..c07a1e7a
--- /dev/null
+++ b/src/Metrics.php
@@ -0,0 +1,103 @@
+<?php
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exceptions\RPCException;
+use Spiral\Goridge\RPC;
+use Spiral\RoadRunner\Exception\MetricException;
+
+/**
+ * Application metrics.
+ */
+final class Metrics
+{
+ /** @var RPC */
+ private $rpc;
+
+ /**
+ * @param RPC $rpc
+ */
+ public function __construct(RPC $rpc)
+ {
+ $this->rpc = $rpc;
+ }
+
+ /**
+ * Add collector value. Fallback to appropriate method of related collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param array $labels
+ *
+ * @throws MetricException
+ */
+ public function add(string $collector, float $value, array $labels = [])
+ {
+ try {
+ $this->rpc->call('metrics.Add', compact('collector', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * Subtract the collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param array $labels
+ *
+ * @throws MetricException
+ */
+ public function sub(string $collector, float $value, array $labels = [])
+ {
+ try {
+ $this->rpc->call('metrics.Sub', compact('collector', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * Observe collector value, only for histogram and summary collectors.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param array $labels
+ *
+ * @throws MetricException
+ */
+ public function observe(string $collector, float $value, array $labels = [])
+ {
+ try {
+ $this->rpc->call('metrics.Observe', compact('collector', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * Set collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param array $labels
+ *
+ * @throws MetricException
+ */
+ public function set(string $collector, float $value, array $labels = [])
+ {
+ try {
+ $this->rpc->call('metrics.Set', compact('collector', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/Worker.php b/src/Worker.php
index b67ebd3b..00cf072a 100644
--- a/src/Worker.php
+++ b/src/Worker.php
@@ -22,7 +22,7 @@ use Spiral\RoadRunner\Exception\RoadRunnerException;
* $worker->send("DONE", json_encode($context));
* }
*/
-class Worker
+final class Worker
{
// Send as response context to request worker termination
const STOP = '{"stop":true}';