diff options
author | Anton Titov <[email protected]> | 2019-06-27 13:00:34 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2019-06-27 13:00:34 +0300 |
commit | ce7f331a5e6c6129331e06224865abdb1298584d (patch) | |
tree | 4fdf5b9158cf481ee77c408ea8d57d83a7e692f3 | |
parent | 5b6e0a535fef745594a4966c724509dcda05b422 (diff) | |
parent | 1633d128309765536e7cbb176225926efda7a33c (diff) |
Merge pull request #168 from spiral/feature/metrics
Feature/metrics
-rw-r--r-- | .travis.yml | 4 | ||||
-rw-r--r-- | Makefile | 1 | ||||
-rw-r--r-- | bin/rr (renamed from src/bin/rr) | 6 | ||||
-rw-r--r-- | cmd/rr/http/metrics.go | 123 | ||||
-rw-r--r-- | cmd/rr/limit/metrics.go | 63 | ||||
-rw-r--r-- | cmd/rr/main.go | 4 | ||||
-rw-r--r-- | composer.json | 2 | ||||
-rw-r--r-- | go.mod | 5 | ||||
-rw-r--r-- | service/metrics/config.go | 119 | ||||
-rw-r--r-- | service/metrics/config_test.go | 27 | ||||
-rw-r--r-- | service/metrics/rpc.go | 156 | ||||
-rw-r--r-- | service/metrics/rpc_test.go | 1 | ||||
-rw-r--r-- | service/metrics/service.go | 91 | ||||
-rw-r--r-- | service/metrics/service_test.go | 190 | ||||
-rw-r--r-- | src/Exception/MetricException.php | 17 | ||||
-rw-r--r-- | src/HttpClient.php | 2 | ||||
-rw-r--r-- | src/Metrics.php | 103 | ||||
-rw-r--r-- | src/Worker.php | 2 |
18 files changed, 906 insertions, 10 deletions
diff --git a/.travis.yml b/.travis.yml index fb1873e3..9ae4200a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,7 +13,7 @@ install: - php composer-setup.php - php composer.phar install --no-interaction --prefer-source - find src/ -name "*.php" -print0 | xargs -0 -n1 -P8 php -l - - chmod +x src/bin/rr && src/bin/rr get-binary + - chmod +x bin/rr && bin/rr get-binary script: - go test -race -v -coverprofile=lib.txt -covermode=atomic @@ -25,6 +25,7 @@ script: - go test ./service/static -race -v -coverprofile=static.txt -covermode=atomic - go test ./service/limit -race -v -coverprofile=limit.txt -covermode=atomic - go test ./service/headers -race -v -coverprofile=headers.txt -covermode=atomic + - go test ./service/metrics -race -v -coverprofile=metrics.txt -covermode=atomic after_success: - bash <(curl -s https://codecov.io/bash) -f lib.txt @@ -36,6 +37,7 @@ after_success: - bash <(curl -s https://codecov.io/bash) -f static.txt - bash <(curl -s https://codecov.io/bash) -f limit.txt - bash <(curl -s https://codecov.io/bash) -f headers.txt + - bash <(curl -s https://codecov.io/bash) -f metrics.txt jobs: include: @@ -19,3 +19,4 @@ test: go test -v -race -cover ./service/static go test -v -race -cover ./service/limit go test -v -race -cover ./service/headers + go test -v -race -cover ./service/metrics @@ -9,8 +9,8 @@ declare(strict_types=1); foreach ([ - __DIR__ . '/../../../../autoload.php', - __DIR__ . '/../../vendor/autoload.php', + __DIR__ . '/../../../autoload.php', + __DIR__ . '/../vendor/autoload.php', __DIR__ . '/vendor/autoload.php' ] as $file) { if (file_exists($file)) { @@ -62,7 +62,7 @@ class RRHelper */ public static function getVersion(): string { - $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh'; + $file = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'build.sh'; $fileResource = fopen($file, 'r') or die(1); while (!feof($fileResource)) { $line = fgets($fileResource, 4096); diff --git a/cmd/rr/http/metrics.go b/cmd/rr/http/metrics.go new file mode 100644 index 00000000..21bbbaf1 --- /dev/null +++ b/cmd/rr/http/metrics.go @@ -0,0 +1,123 @@ +package http + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + rrhttp "github.com/spiral/roadrunner/service/http" + "github.com/spiral/roadrunner/service/metrics" + "github.com/spiral/roadrunner/util" + "strconv" + "time" +) + +func init() { + cobra.OnInitialize(func() { + svc, _ := rr.Container.Get(metrics.ID) + mtr, ok := svc.(*metrics.Service) + if !ok || !mtr.Enabled() { + return + } + + ht, _ := rr.Container.Get(rrhttp.ID) + if ht, ok := ht.(*rrhttp.Service); ok { + collector := newCollector() + + // register metrics + mtr.MustRegister(collector.requestCounter) + mtr.MustRegister(collector.requestDuration) + mtr.MustRegister(collector.workersMemory) + + // collect events + ht.AddListener(collector.listener) + + // update memory usage every 10 seconds + go collector.collectMemory(ht, time.Second*10) + } + }) +} + +// listener provide debug callback for system events. With colors! +type metricCollector struct { + requestCounter *prometheus.CounterVec + requestDuration *prometheus.HistogramVec + workersMemory prometheus.Gauge +} + +func newCollector() *metricCollector { + return &metricCollector{ + requestCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "rr_http_request_total", + Help: "Total number of handled http requests after server restart.", + }, + []string{"status"}, + ), + requestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rr_http_request_duration_seconds", + Help: "HTTP request duration.", + }, + []string{"status"}, + ), + workersMemory: prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "rr_http_workers_memory_bytes", + Help: "Memory usage by HTTP workers.", + }, + ), + } +} + +// listener listens to http events and generates nice looking output. +func (c *metricCollector) listener(event int, ctx interface{}) { + // http events + switch event { + case rrhttp.EventResponse: + e := ctx.(*rrhttp.ResponseEvent) + + c.requestCounter.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Inc() + + c.requestDuration.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Observe(e.Elapsed().Seconds()) + + case rrhttp.EventError: + e := ctx.(*rrhttp.ErrorEvent) + + c.requestCounter.With(prometheus.Labels{ + "status": "500", + }).Inc() + + c.requestDuration.With(prometheus.Labels{ + "status": "500", + }).Observe(e.Elapsed().Seconds()) + } +} + +// collect memory usage by server workers +func (c *metricCollector) collectMemory(service *rrhttp.Service, tick time.Duration) { + started := false + for { + server := service.Server() + if server == nil && started { + // stopped + return + } + + started = true + + if workers, err := util.ServerState(server); err == nil { + sum := 0.0 + for _, w := range workers { + sum = sum + float64(w.MemoryUsage) + } + + c.workersMemory.Set(sum) + } + + time.Sleep(tick) + } +} diff --git a/cmd/rr/limit/metrics.go b/cmd/rr/limit/metrics.go new file mode 100644 index 00000000..947f53fe --- /dev/null +++ b/cmd/rr/limit/metrics.go @@ -0,0 +1,63 @@ +package limit + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cobra" + rr "github.com/spiral/roadrunner/cmd/rr/cmd" + rrlimit "github.com/spiral/roadrunner/service/limit" + "github.com/spiral/roadrunner/service/metrics" +) + +func init() { + cobra.OnInitialize(func() { + svc, _ := rr.Container.Get(metrics.ID) + mtr, ok := svc.(*metrics.Service) + if !ok || !mtr.Enabled() { + return + } + + ht, _ := rr.Container.Get(rrlimit.ID) + if ht, ok := ht.(*rrlimit.Service); ok { + collector := newCollector() + + // register metrics + mtr.MustRegister(collector.maxMemory) + + // collect events + ht.AddListener(collector.listener) + } + }) +} + +// listener provide debug callback for system events. With colors! +type metricCollector struct { + maxMemory prometheus.Counter + maxExecutionTime prometheus.Counter +} + +func newCollector() *metricCollector { + return &metricCollector{ + maxMemory: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "rr_limit_max_memory", + Help: "Total number of workers that was killed because they reached max memory limit.", + }, + ), + maxExecutionTime: prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "rr_limit_max_execution_time", + Help: "Total number of workers that was killed because they reached max execution time limit.", + }, + ), + } +} + +// listener listens to http events and generates nice looking output. +func (c *metricCollector) listener(event int, ctx interface{}) { + switch event { + case rrlimit.EventMaxMemory: + c.maxMemory.Inc() + case rrlimit.EventExecTTL: + c.maxExecutionTime.Inc() + } +} diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 6f325fb4..6fb10ba6 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -24,12 +24,13 @@ package main import ( rr "github.com/spiral/roadrunner/cmd/rr/cmd" - "github.com/spiral/roadrunner/service/headers" // services (plugins) "github.com/spiral/roadrunner/service/env" + "github.com/spiral/roadrunner/service/headers" "github.com/spiral/roadrunner/service/http" "github.com/spiral/roadrunner/service/limit" + "github.com/spiral/roadrunner/service/metrics" "github.com/spiral/roadrunner/service/rpc" "github.com/spiral/roadrunner/service/static" @@ -42,6 +43,7 @@ func main() { rr.Container.Register(env.ID, &env.Service{}) rr.Container.Register(rpc.ID, &rpc.Service{}) rr.Container.Register(http.ID, &http.Service{}) + rr.Container.Register(metrics.ID, &metrics.Service{}) rr.Container.Register(headers.ID, &headers.Service{}) rr.Container.Register(static.ID, &static.Service{}) rr.Container.Register(limit.ID, &limit.Service{}) diff --git a/composer.json b/composer.json index f71236b1..fd50fa16 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,6 @@ } }, "bin": [ - "src/bin/rr" + "bin/rr" ] } @@ -13,15 +13,16 @@ require ( github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b github.com/olekukonko/tablewriter v0.0.1 github.com/pkg/errors v0.8.1 + github.com/prometheus/client_golang v1.0.0 github.com/shirou/gopsutil v2.17.12+incompatible github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect github.com/sirupsen/logrus v1.3.0 github.com/spf13/cobra v0.0.3 github.com/spf13/viper v1.3.1 github.com/spiral/goridge v2.1.3+incompatible - github.com/stretchr/testify v1.2.2 + github.com/stretchr/testify v1.3.0 github.com/yookoala/gofast v0.3.0 - golang.org/x/net v0.0.0-20181017193950-04a2e542c03f + golang.org/x/net v0.0.0-20181114220301-adae6a3d119a golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/service/metrics/config.go b/service/metrics/config.go new file mode 100644 index 00000000..b9b21ea9 --- /dev/null +++ b/service/metrics/config.go @@ -0,0 +1,119 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/spiral/roadrunner/service" +) + +type Config struct { + // Address to listen + Address string + + // Collect define application specific metrics. + Collect map[string]Collector +} + +// Collector describes single application specific metric. +type Collector struct { + // Namespace of the metric. + Namespace string + + // Subsystem of the metric. + Subsystem string + + // Collector type (histogram, gauge, counter, summary). + Type string + + // Help of collector. + Help string + + // Labels for vectorized metrics. + Labels []string + + // Buckets for histogram metric. + Buckets []float64 +} + +// Hydrate configuration. +func (c *Config) Hydrate(cfg service.Config) error { + return cfg.Unmarshal(c) +} + +// register application specific metrics. +func (c *Config) initCollectors() (map[string]prometheus.Collector, error) { + if c.Collect == nil { + return nil, nil + } + + collectors := make(map[string]prometheus.Collector) + + for name, m := range c.Collect { + var collector prometheus.Collector + switch m.Type { + case "histogram": + opts := prometheus.HistogramOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + Buckets: m.Buckets, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewHistogramVec(opts, m.Labels) + } else { + collector = prometheus.NewHistogram(opts) + } + case "gauge": + opts := prometheus.GaugeOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewGaugeVec(opts, m.Labels) + } else { + collector = prometheus.NewGauge(opts) + } + case "counter": + opts := prometheus.CounterOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewCounterVec(opts, m.Labels) + } else { + collector = prometheus.NewCounter(opts) + } + case "summary": + opts := prometheus.SummaryOpts{ + Name: name, + Namespace: m.Namespace, + Subsystem: m.Subsystem, + Help: m.Help, + } + + if len(m.Labels) != 0 { + collector = prometheus.NewSummaryVec(opts, m.Labels) + } else { + collector = prometheus.NewSummary(opts) + } + default: + return nil, fmt.Errorf("invalid metric type `%s` for `%s`", m.Type, name) + } + + if err := prometheus.Register(collector); err != nil { + return nil, err + } + + collectors[name] = collector + } + + return collectors, nil +} diff --git a/service/metrics/config_test.go b/service/metrics/config_test.go new file mode 100644 index 00000000..bd02d1cf --- /dev/null +++ b/service/metrics/config_test.go @@ -0,0 +1,27 @@ +package metrics + +import ( + "encoding/json" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "testing" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"request": {"From": "Something"}}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} diff --git a/service/metrics/rpc.go b/service/metrics/rpc.go new file mode 100644 index 00000000..30ad6c62 --- /dev/null +++ b/service/metrics/rpc.go @@ -0,0 +1,156 @@ +package metrics + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" +) + +type rpcServer struct{ svc *Service } + +// Metric represent single metric produced by the application. +type Metric struct { + // Collector name. + Name string + + // Collector value. + Value float64 + + // Labels associated with metric. Only for vector metrics. + Labels []string +} + +// Add new metric to the designated collector. +func (rpc *rpcServer) Add(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Add(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Add(m.Value) + + case prometheus.Counter: + c.(prometheus.Counter).Add(m.Value) + + case *prometheus.CounterVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.CounterVec).WithLabelValues(m.Labels...).Add(m.Value) + + case prometheus.Summary: + c.(prometheus.Counter).Add(m.Value) + + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value) + + case prometheus.Histogram: + c.(prometheus.Histogram).Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value) + } + + *ok = true + return nil +} + +// Sub subtract the value from the specific metric (gauge only). +func (rpc *rpcServer) Sub(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Sub(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Sub(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Sub`", m.Name) + } + + *ok = true + return nil +} + +// Observe the value (histogram and summary only). +func (rpc *rpcServer) Observe(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case *prometheus.SummaryVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.SummaryVec).WithLabelValues(m.Labels...).Observe(m.Value) + + case prometheus.Histogram: + c.(prometheus.Histogram).Observe(m.Value) + + case *prometheus.HistogramVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.HistogramVec).WithLabelValues(m.Labels...).Observe(m.Value) + default: + return fmt.Errorf("collector `%s` does not support method `Observe`", m.Name) + } + + *ok = true + return nil +} + +// Set the metric value (only for gaude). +func (rpc *rpcServer) Set(m *Metric, ok *bool) error { + c := rpc.svc.Collector(m.Name) + if c == nil { + return fmt.Errorf("undefined collector `%s`", m.Name) + } + + switch c.(type) { + case prometheus.Gauge: + c.(prometheus.Gauge).Set(m.Value) + + case *prometheus.GaugeVec: + if len(m.Labels) == 0 { + return fmt.Errorf("required labels for collector `%s`", m.Name) + } + + c.(*prometheus.GaugeVec).WithLabelValues(m.Labels...).Set(m.Value) + + default: + return fmt.Errorf("collector `%s` does not support method `Set`", m.Name) + } + + *ok = true + return nil +} diff --git a/service/metrics/rpc_test.go b/service/metrics/rpc_test.go new file mode 100644 index 00000000..1abe097a --- /dev/null +++ b/service/metrics/rpc_test.go @@ -0,0 +1 @@ +package metrics diff --git a/service/metrics/service.go b/service/metrics/service.go new file mode 100644 index 00000000..2c94568d --- /dev/null +++ b/service/metrics/service.go @@ -0,0 +1,91 @@ +package metrics + +import ( + "context" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spiral/roadrunner/service/rpc" + "net/http" + "sync" +) + +// ID declares public service name. +const ID = "metrics" + +// Service to manage application metrics using Prometheus. +type Service struct { + cfg *Config + mu sync.Mutex + http *http.Server + collectors sync.Map +} + +// Init service. +func (s *Service) Init(cfg *Config, r *rpc.Service) (bool, error) { + s.cfg = cfg + + if r != nil { + if err := r.Register(ID, &rpcServer{s}); err != nil { + return false, err + } + } + + return true, nil +} + +// Enabled indicates that server is able to collect metrics. +func (s *Service) Enabled() bool { + return s.cfg != nil +} + +// Register new prometheus collector. +func (s *Service) Register(c prometheus.Collector) error { + return prometheus.Register(c) +} + +// MustRegister registers new collector or fails with panic. +func (s *Service) MustRegister(c prometheus.Collector) { + if err := prometheus.Register(c); err != nil { + panic(err) + } +} + +// Serve prometheus metrics service. +func (s *Service) Serve() error { + // register application specific metrics + collectors, err := s.cfg.initCollectors() + if err != nil { + return err + } + + for name, collector := range collectors { + s.collectors.Store(name, collector) + } + + s.mu.Lock() + s.http = &http.Server{Addr: s.cfg.Address, Handler: promhttp.Handler()} + s.mu.Unlock() + + return s.http.ListenAndServe() +} + +// Stop prometheus metrics service. +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.http != nil { + // gracefully stop server + go s.http.Shutdown(context.Background()) + } +} + +// Collector returns application specific collector by name or nil if collector not found. +func (s *Service) Collector(name string) prometheus.Collector { + collector, ok := s.collectors.Load(name) + if !ok { + return nil + } + + return collector.(prometheus.Collector) +} diff --git a/service/metrics/service_test.go b/service/metrics/service_test.go new file mode 100644 index 00000000..513b3042 --- /dev/null +++ b/service/metrics/service_test.go @@ -0,0 +1,190 @@ +package metrics + +import ( + "encoding/json" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" + "io/ioutil" + "net/http" + "testing" + "time" +) + +type testCfg struct { + metricsCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.metricsCfg} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + err := json.Unmarshal([]byte(cfg.target), out) + return err +} + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} + +func TestService_Serve(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "go_gc_duration_seconds") +} + +func Test_ServiceCustomMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge", + Help: "My gauge value", + }) + + assert.NoError(t, s.(*Service).Register(collector)) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge 100") +} + +func Test_ServiceCustomMetricMust(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112" + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + collector := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "my_gauge_2", + Help: "My gauge value", + }) + + s.(*Service).MustRegister(collector) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + collector.Set(100) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "my_gauge_2 100") +} + +func Test_ConfiguredMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "user_gauge":{ + "type": "gauge" + } + } + + }`})) + + s, _ := c.Get(ID) + assert.NotNil(t, s) + + assert.True(t, s.(*Service).Enabled()) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + s.(*Service).Collector("user_gauge").(prometheus.Gauge).Set(100) + + assert.Nil(t, s.(*Service).Collector("invalid")) + + out, _, err := get("http://localhost:2112/metrics") + assert.NoError(t, err) + + assert.Contains(t, out, "user_gauge 100") +} + +func Test_ConfiguredInvalidMetric(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{metricsCfg: `{ + "address": "localhost:2112", + "collect":{ + "user_gauge":{ + "type": "invalid" + } + } + + }`})) + + assert.Error(t, c.Serve()) +} diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php new file mode 100644 index 00000000..2d12eefe --- /dev/null +++ b/src/Exception/MetricException.php @@ -0,0 +1,17 @@ +<?php +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Exception; + +use Spiral\Goridge\Exceptions\RPCException; + +class MetricException extends RPCException +{ + +}
\ No newline at end of file diff --git a/src/HttpClient.php b/src/HttpClient.php index f31a9b50..d2862ad8 100644 --- a/src/HttpClient.php +++ b/src/HttpClient.php @@ -8,7 +8,7 @@ declare(strict_types=1); namespace Spiral\RoadRunner; -class HttpClient +final class HttpClient { /** @var Worker */ private $worker; diff --git a/src/Metrics.php b/src/Metrics.php new file mode 100644 index 00000000..c07a1e7a --- /dev/null +++ b/src/Metrics.php @@ -0,0 +1,103 @@ +<?php +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\RPCException; +use Spiral\Goridge\RPC; +use Spiral\RoadRunner\Exception\MetricException; + +/** + * Application metrics. + */ +final class Metrics +{ + /** @var RPC */ + private $rpc; + + /** + * @param RPC $rpc + */ + public function __construct(RPC $rpc) + { + $this->rpc = $rpc; + } + + /** + * Add collector value. Fallback to appropriate method of related collector. + * + * @param string $collector + * @param float $value + * @param array $labels + * + * @throws MetricException + */ + public function add(string $collector, float $value, array $labels = []) + { + try { + $this->rpc->call('metrics.Add', compact('collector', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Subtract the collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param array $labels + * + * @throws MetricException + */ + public function sub(string $collector, float $value, array $labels = []) + { + try { + $this->rpc->call('metrics.Sub', compact('collector', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Observe collector value, only for histogram and summary collectors. + * + * @param string $collector + * @param float $value + * @param array $labels + * + * @throws MetricException + */ + public function observe(string $collector, float $value, array $labels = []) + { + try { + $this->rpc->call('metrics.Observe', compact('collector', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * Set collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param array $labels + * + * @throws MetricException + */ + public function set(string $collector, float $value, array $labels = []) + { + try { + $this->rpc->call('metrics.Set', compact('collector', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } +}
\ No newline at end of file diff --git a/src/Worker.php b/src/Worker.php index b67ebd3b..00cf072a 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -22,7 +22,7 @@ use Spiral\RoadRunner\Exception\RoadRunnerException; * $worker->send("DONE", json_encode($context)); * } */ -class Worker +final class Worker { // Send as response context to request worker termination const STOP = '{"stop":true}'; |