summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/worker_handler/handler.go8
-rw-r--r--plugins/http/metrics.go92
-rw-r--r--plugins/http/plugin.go15
-rw-r--r--plugins/metrics/plugin.go91
-rw-r--r--tests/plugins/metrics/configs/.rr-http-metrics.yaml20
-rw-r--r--tests/plugins/metrics/metrics_test.go95
6 files changed, 276 insertions, 45 deletions
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go
index 0ff23d9d..2534a331 100644
--- a/pkg/worker_handler/handler.go
+++ b/pkg/worker_handler/handler.go
@@ -63,7 +63,7 @@ type Handler struct {
log logger.Logger
pool pool.Pool
mul sync.Mutex
- lsn events.Listener
+ lsn []events.Listener
internalHTTPCode uint64
}
@@ -82,7 +82,7 @@ func NewHandler(maxReqSize uint64, internalHTTPCode uint64, uploads config.Uploa
}
// AddListener attaches handler event controller.
-func (h *Handler) AddListener(l events.Listener) {
+func (h *Handler) AddListener(l ...events.Listener) {
h.mul.Lock()
defer h.mul.Unlock()
@@ -192,7 +192,9 @@ func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time)
// sendEvent invokes event handler if any.
func (h *Handler) sendEvent(event interface{}) {
if h.lsn != nil {
- h.lsn(event)
+ for _, l := range h.lsn {
+ l(event)
+ }
}
}
diff --git a/plugins/http/metrics.go b/plugins/http/metrics.go
new file mode 100644
index 00000000..d7a9110b
--- /dev/null
+++ b/plugins/http/metrics.go
@@ -0,0 +1,92 @@
+package http
+
+import (
+ "strconv"
+
+ "github.com/prometheus/client_golang/prometheus"
+ handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
+)
+
+func (p *Plugin) MetricsCollector() []prometheus.Collector {
+ // p - implements Exporter interface (workers)
+ // other - request duration and count
+ return []prometheus.Collector{p, p.requestsExporter.requestDuration, p.requestsExporter.requestCounter}
+}
+
+func (p *Plugin) metricsCallback(event interface{}) {
+ switch e := event.(type) {
+ case handler.ResponseEvent:
+ p.requestsExporter.requestCounter.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Inc()
+
+ p.requestsExporter.requestDuration.With(prometheus.Labels{
+ "status": strconv.Itoa(e.Response.Status),
+ }).Observe(e.Elapsed().Seconds())
+ case handler.ErrorEvent:
+ p.requestsExporter.requestCounter.With(prometheus.Labels{
+ "status": "500",
+ }).Inc()
+
+ p.requestsExporter.requestDuration.With(prometheus.Labels{
+ "status": "500",
+ }).Observe(e.Elapsed().Seconds())
+ }
+}
+
+type workersExporter struct {
+ wm *prometheus.Desc
+ workersMemory uint64
+}
+
+func newWorkersExporter() *workersExporter {
+ return &workersExporter{
+ wm: prometheus.NewDesc("rr_http_workers_memory_bytes", "Memory usage by HTTP workers.", nil, nil),
+ workersMemory: 0,
+ }
+}
+
+func (p *Plugin) Describe(d chan<- *prometheus.Desc) {
+ // send description
+ d <- p.workersExporter.wm
+}
+
+func (p *Plugin) Collect(ch chan<- prometheus.Metric) {
+ // get the copy of the processes
+ workers := p.Workers()
+
+ // cumulative RSS memory in bytes
+ var cum uint64
+
+ // collect the memory
+ for i := 0; i < len(workers); i++ {
+ cum += workers[i].MemoryUsage
+ }
+
+ // send the values to the prometheus
+ ch <- prometheus.MustNewConstMetric(p.workersExporter.wm, prometheus.GaugeValue, float64(cum))
+}
+
+type requestsExporter struct {
+ requestCounter *prometheus.CounterVec
+ requestDuration *prometheus.HistogramVec
+}
+
+func newRequestsExporter() *requestsExporter {
+ return &requestsExporter{
+ requestCounter: prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: "rr_http_request_total",
+ Help: "Total number of handled http requests after server restart.",
+ },
+ []string{"status"},
+ ),
+ requestDuration: prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Name: "rr_http_request_duration_seconds",
+ Help: "HTTP request duration.",
+ },
+ []string{"status"},
+ ),
+ }
+}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index ba83344a..bec01ac3 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -62,6 +62,10 @@ type Plugin struct {
// servers RR handler
handler *handler.Handler
+ // metrics
+ workersExporter *workersExporter
+ requestsExporter *requestsExporter
+
// servers
http *http.Server
https *http.Server
@@ -102,6 +106,11 @@ func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server serv
p.cfg.Env = make(map[string]string)
}
+ // initialize workersExporter
+ p.workersExporter = newWorkersExporter()
+ // initialize requests exporter
+ p.requestsExporter = newRequestsExporter()
+
p.cfg.Env[RrMode] = "http"
p.server = server
@@ -159,7 +168,7 @@ func (p *Plugin) serve(errCh chan error) {
return
}
- p.handler.AddListener(p.logCallback)
+ p.handler.AddListener(p.logCallback, p.metricsCallback)
if p.cfg.EnableHTTP() {
if p.cfg.EnableH2C() {
@@ -341,7 +350,7 @@ func (p *Plugin) Reset() error {
}
p.log.Info("HTTP handler listeners successfully re-added")
- p.handler.AddListener(p.logCallback)
+ p.handler.AddListener(p.logCallback, p.metricsCallback)
p.log.Info("HTTP plugin successfully restarted")
return nil
@@ -386,7 +395,7 @@ func (p *Plugin) Ready() status.Status {
workers := p.workers()
for i := 0; i < len(workers); i++ {
// If state of the worker is ready (at least 1)
- // we assume, that plugin'p worker pool is ready
+ // we assume, that plugin's worker pool is ready
if workers[i].State().Value() == worker.StateReady {
return status.Status{
Code: http.StatusOK,
diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go
index 474bb21d..1f65e313 100644
--- a/plugins/metrics/plugin.go
+++ b/plugins/metrics/plugin.go
@@ -30,62 +30,81 @@ type Plugin struct {
http *http.Server
collectors sync.Map // all receivers are pointers
registry *prometheus.Registry
+
+ // prometheus Collectors
+ statProviders []StatProvider
}
// Init service.
-func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
const op = errors.Op("metrics_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
- err := cfg.UnmarshalKey(PluginName, &m.cfg)
+ err := cfg.UnmarshalKey(PluginName, &p.cfg)
if err != nil {
return errors.E(op, errors.Disabled, err)
}
- m.cfg.InitDefaults()
+ p.cfg.InitDefaults()
- m.log = log
- m.registry = prometheus.NewRegistry()
+ p.log = log
+ p.registry = prometheus.NewRegistry()
// Default
- err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
+ err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
if err != nil {
return errors.E(op, err)
}
// Default
- err = m.registry.Register(prometheus.NewGoCollector())
+ err = p.registry.Register(prometheus.NewGoCollector())
if err != nil {
return errors.E(op, err)
}
- collectors, err := m.cfg.getCollectors()
+ collectors, err := p.cfg.getCollectors()
if err != nil {
return errors.E(op, err)
}
// Register invocation will be later in the Serve method
for k, v := range collectors {
- m.collectors.Store(k, v)
+ p.collectors.Store(k, v)
}
+
+ p.statProviders = make([]StatProvider, 0, 2)
+
return nil
}
// Register new prometheus collector.
-func (m *Plugin) Register(c prometheus.Collector) error {
- return m.registry.Register(c)
+func (p *Plugin) Register(c prometheus.Collector) error {
+ return p.registry.Register(c)
}
// Serve prometheus metrics service.
-func (m *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
- m.collectors.Range(func(key, value interface{}) bool {
+
+ // register Collected stat providers
+ for i := 0; i < len(p.statProviders); i++ {
+ sp := p.statProviders[i]
+ for _, c := range sp.MetricsCollector() {
+ err := p.registry.Register(c)
+ if err != nil {
+ errCh <- err
+ return errCh
+ }
+ }
+ }
+
+ p.collectors.Range(func(key, value interface{}) bool {
// key - name
// value - prometheus.Collector
c := value.(prometheus.Collector)
- if err := m.registry.Register(c); err != nil {
+ if err := p.registry.Register(c); err != nil {
errCh <- err
return false
}
@@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error {
DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...)
DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...)
- m.http = &http.Server{
- Addr: m.cfg.Address,
- Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
+ p.http = &http.Server{
+ Addr: p.cfg.Address,
+ Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}),
IdleTimeout: time.Hour * 24,
ReadTimeout: time.Minute * 60,
MaxHeaderBytes: maxHeaderSize,
@@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error {
}
go func() {
- err := m.http.ListenAndServe()
+ err := p.http.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
errCh <- err
return
@@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error {
}
// Stop prometheus metrics service.
-func (m *Plugin) Stop() error {
- m.mu.Lock()
- defer m.mu.Unlock()
+func (p *Plugin) Stop() error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
- if m.http != nil {
+ if p.http != nil {
// timeout is 10 seconds
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
- err := m.http.Shutdown(ctx)
+ err := p.http.Shutdown(ctx)
if err != nil {
// Function should be Stop() error
- m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
+ p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err))
}
}
return nil
}
// Collects used to collect all plugins which implement metrics.StatProvider interface (and Named)
-func (m *Plugin) Collects() []interface{} {
+func (p *Plugin) Collects() []interface{} {
return []interface{}{
- m.AddStatProvider,
+ p.AddStatProvider,
}
}
// AddStatProvider adds a metrics provider
-func (m *Plugin) AddStatProvider(stat StatProvider) error {
- for _, c := range stat.MetricsCollector() {
- err := m.registry.Register(c)
- if err != nil {
- return err
- }
- }
+func (p *Plugin) AddStatProvider(stat StatProvider) error {
+ p.statProviders = append(p.statProviders, stat)
return nil
}
// Name returns user friendly plugin name
-func (m *Plugin) Name() string {
+func (p *Plugin) Name() string {
return PluginName
}
// RPC interface satisfaction
-func (m *Plugin) RPC() interface{} {
+func (p *Plugin) RPC() interface{} {
return &rpcServer{
- svc: m,
- log: m.log,
+ svc: p,
+ log: p.log,
}
}
// Available interface implementation
-func (m *Plugin) Available() {
-}
+func (p *Plugin) Available() {}
diff --git a/tests/plugins/metrics/configs/.rr-http-metrics.yaml b/tests/plugins/metrics/configs/.rr-http-metrics.yaml
new file mode 100644
index 00000000..95f131c0
--- /dev/null
+++ b/tests/plugins/metrics/configs/.rr-http-metrics.yaml
@@ -0,0 +1,20 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ relay: "pipes"
+
+http:
+ address: 127.0.0.1:13223
+ max_request_size: 1024
+ middleware: [ ]
+ pool:
+ num_workers: 1
+
+metrics:
+ address: localhost:2112
+
+logs:
+ mode: development
+ level: debug
diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go
index 3e2023d4..84c60592 100644
--- a/tests/plugins/metrics/metrics_test.go
+++ b/tests/plugins/metrics/metrics_test.go
@@ -972,3 +972,98 @@ func declareMetricsTest(t *testing.T) {
assert.NoError(t, err)
assert.True(t, ret)
}
+
+func TestHTTPMetrics(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = "configs/.rr-http-metrics.yaml"
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes()
+ mockLogger.EXPECT().Debug("200 GET http://localhost:13223/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1)
+
+ err = cont.RegisterAll(
+ cfg,
+ &metrics.Plugin{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ mockLogger,
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ tt := time.NewTimer(time.Minute * 3)
+
+ go func() {
+ defer tt.Stop()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 2)
+ t.Run("req1", echoHTTP)
+ t.Run("req2", echoHTTP)
+
+ genericOut, err := get()
+ assert.NoError(t, err)
+ assert.Contains(t, genericOut, `rr_http_request_duration_seconds_bucket`)
+ assert.Contains(t, genericOut, `rr_http_request_duration_seconds_sum{status="200"}`)
+ assert.Contains(t, genericOut, `rr_http_request_duration_seconds_count{status="200"} 2`)
+ assert.Contains(t, genericOut, `rr_http_request_total{status="200"} 2`)
+ assert.Contains(t, genericOut, "rr_http_workers_memory_bytes")
+
+ close(sig)
+}
+
+func echoHTTP(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://localhost:13223", nil)
+ assert.NoError(t, err)
+
+ r, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ _, err = ioutil.ReadAll(r.Body)
+ assert.NoError(t, err)
+ assert.Equal(t, 200, r.StatusCode)
+
+ err = r.Body.Close()
+ assert.NoError(t, err)
+}