diff options
-rw-r--r-- | pkg/worker_handler/handler.go | 8 | ||||
-rw-r--r-- | plugins/http/metrics.go | 92 | ||||
-rw-r--r-- | plugins/http/plugin.go | 15 | ||||
-rw-r--r-- | plugins/metrics/plugin.go | 91 | ||||
-rw-r--r-- | tests/plugins/metrics/configs/.rr-http-metrics.yaml | 20 | ||||
-rw-r--r-- | tests/plugins/metrics/metrics_test.go | 95 |
6 files changed, 276 insertions, 45 deletions
diff --git a/pkg/worker_handler/handler.go b/pkg/worker_handler/handler.go index 0ff23d9d..2534a331 100644 --- a/pkg/worker_handler/handler.go +++ b/pkg/worker_handler/handler.go @@ -63,7 +63,7 @@ type Handler struct { log logger.Logger pool pool.Pool mul sync.Mutex - lsn events.Listener + lsn []events.Listener internalHTTPCode uint64 } @@ -82,7 +82,7 @@ func NewHandler(maxReqSize uint64, internalHTTPCode uint64, uploads config.Uploa } // AddListener attaches handler event controller. -func (h *Handler) AddListener(l events.Listener) { +func (h *Handler) AddListener(l ...events.Listener) { h.mul.Lock() defer h.mul.Unlock() @@ -192,7 +192,9 @@ func (h *Handler) handleResponse(req *Request, resp *Response, start time.Time) // sendEvent invokes event handler if any. func (h *Handler) sendEvent(event interface{}) { if h.lsn != nil { - h.lsn(event) + for _, l := range h.lsn { + l(event) + } } } diff --git a/plugins/http/metrics.go b/plugins/http/metrics.go new file mode 100644 index 00000000..d7a9110b --- /dev/null +++ b/plugins/http/metrics.go @@ -0,0 +1,92 @@ +package http + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" +) + +func (p *Plugin) MetricsCollector() []prometheus.Collector { + // p - implements Exporter interface (workers) + // other - request duration and count + return []prometheus.Collector{p, p.requestsExporter.requestDuration, p.requestsExporter.requestCounter} +} + +func (p *Plugin) metricsCallback(event interface{}) { + switch e := event.(type) { + case handler.ResponseEvent: + p.requestsExporter.requestCounter.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Inc() + + p.requestsExporter.requestDuration.With(prometheus.Labels{ + "status": strconv.Itoa(e.Response.Status), + }).Observe(e.Elapsed().Seconds()) + case handler.ErrorEvent: + p.requestsExporter.requestCounter.With(prometheus.Labels{ + "status": "500", + }).Inc() + + p.requestsExporter.requestDuration.With(prometheus.Labels{ + "status": "500", + }).Observe(e.Elapsed().Seconds()) + } +} + +type workersExporter struct { + wm *prometheus.Desc + workersMemory uint64 +} + +func newWorkersExporter() *workersExporter { + return &workersExporter{ + wm: prometheus.NewDesc("rr_http_workers_memory_bytes", "Memory usage by HTTP workers.", nil, nil), + workersMemory: 0, + } +} + +func (p *Plugin) Describe(d chan<- *prometheus.Desc) { + // send description + d <- p.workersExporter.wm +} + +func (p *Plugin) Collect(ch chan<- prometheus.Metric) { + // get the copy of the processes + workers := p.Workers() + + // cumulative RSS memory in bytes + var cum uint64 + + // collect the memory + for i := 0; i < len(workers); i++ { + cum += workers[i].MemoryUsage + } + + // send the values to the prometheus + ch <- prometheus.MustNewConstMetric(p.workersExporter.wm, prometheus.GaugeValue, float64(cum)) +} + +type requestsExporter struct { + requestCounter *prometheus.CounterVec + requestDuration *prometheus.HistogramVec +} + +func newRequestsExporter() *requestsExporter { + return &requestsExporter{ + requestCounter: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "rr_http_request_total", + Help: "Total number of handled http requests after server restart.", + }, + []string{"status"}, + ), + requestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "rr_http_request_duration_seconds", + Help: "HTTP request duration.", + }, + []string{"status"}, + ), + } +} diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index ba83344a..bec01ac3 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -62,6 +62,10 @@ type Plugin struct { // servers RR handler handler *handler.Handler + // metrics + workersExporter *workersExporter + requestsExporter *requestsExporter + // servers http *http.Server https *http.Server @@ -102,6 +106,11 @@ func (p *Plugin) Init(cfg config.Configurer, rrLogger logger.Logger, server serv p.cfg.Env = make(map[string]string) } + // initialize workersExporter + p.workersExporter = newWorkersExporter() + // initialize requests exporter + p.requestsExporter = newRequestsExporter() + p.cfg.Env[RrMode] = "http" p.server = server @@ -159,7 +168,7 @@ func (p *Plugin) serve(errCh chan error) { return } - p.handler.AddListener(p.logCallback) + p.handler.AddListener(p.logCallback, p.metricsCallback) if p.cfg.EnableHTTP() { if p.cfg.EnableH2C() { @@ -341,7 +350,7 @@ func (p *Plugin) Reset() error { } p.log.Info("HTTP handler listeners successfully re-added") - p.handler.AddListener(p.logCallback) + p.handler.AddListener(p.logCallback, p.metricsCallback) p.log.Info("HTTP plugin successfully restarted") return nil @@ -386,7 +395,7 @@ func (p *Plugin) Ready() status.Status { workers := p.workers() for i := 0; i < len(workers); i++ { // If state of the worker is ready (at least 1) - // we assume, that plugin'p worker pool is ready + // we assume, that plugin's worker pool is ready if workers[i].State().Value() == worker.StateReady { return status.Status{ Code: http.StatusOK, diff --git a/plugins/metrics/plugin.go b/plugins/metrics/plugin.go index 474bb21d..1f65e313 100644 --- a/plugins/metrics/plugin.go +++ b/plugins/metrics/plugin.go @@ -30,62 +30,81 @@ type Plugin struct { http *http.Server collectors sync.Map // all receivers are pointers registry *prometheus.Registry + + // prometheus Collectors + statProviders []StatProvider } // Init service. -func (m *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { const op = errors.Op("metrics_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } - err := cfg.UnmarshalKey(PluginName, &m.cfg) + err := cfg.UnmarshalKey(PluginName, &p.cfg) if err != nil { return errors.E(op, errors.Disabled, err) } - m.cfg.InitDefaults() + p.cfg.InitDefaults() - m.log = log - m.registry = prometheus.NewRegistry() + p.log = log + p.registry = prometheus.NewRegistry() // Default - err = m.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + err = p.registry.Register(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) if err != nil { return errors.E(op, err) } // Default - err = m.registry.Register(prometheus.NewGoCollector()) + err = p.registry.Register(prometheus.NewGoCollector()) if err != nil { return errors.E(op, err) } - collectors, err := m.cfg.getCollectors() + collectors, err := p.cfg.getCollectors() if err != nil { return errors.E(op, err) } // Register invocation will be later in the Serve method for k, v := range collectors { - m.collectors.Store(k, v) + p.collectors.Store(k, v) } + + p.statProviders = make([]StatProvider, 0, 2) + return nil } // Register new prometheus collector. -func (m *Plugin) Register(c prometheus.Collector) error { - return m.registry.Register(c) +func (p *Plugin) Register(c prometheus.Collector) error { + return p.registry.Register(c) } // Serve prometheus metrics service. -func (m *Plugin) Serve() chan error { +func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) - m.collectors.Range(func(key, value interface{}) bool { + + // register Collected stat providers + for i := 0; i < len(p.statProviders); i++ { + sp := p.statProviders[i] + for _, c := range sp.MetricsCollector() { + err := p.registry.Register(c) + if err != nil { + errCh <- err + return errCh + } + } + } + + p.collectors.Range(func(key, value interface{}) bool { // key - name // value - prometheus.Collector c := value.(prometheus.Collector) - if err := m.registry.Register(c); err != nil { + if err := p.registry.Register(c); err != nil { errCh <- err return false } @@ -141,9 +160,9 @@ func (m *Plugin) Serve() chan error { DefaultCipherSuites = append(DefaultCipherSuites, topCipherSuites...) DefaultCipherSuites = append(DefaultCipherSuites, defaultCipherSuitesTLS13...) - m.http = &http.Server{ - Addr: m.cfg.Address, - Handler: promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}), + p.http = &http.Server{ + Addr: p.cfg.Address, + Handler: promhttp.HandlerFor(p.registry, promhttp.HandlerOpts{}), IdleTimeout: time.Hour * 24, ReadTimeout: time.Minute * 60, MaxHeaderBytes: maxHeaderSize, @@ -163,7 +182,7 @@ func (m *Plugin) Serve() chan error { } go func() { - err := m.http.ListenAndServe() + err := p.http.ListenAndServe() if err != nil && err != http.ErrServerClosed { errCh <- err return @@ -174,55 +193,49 @@ func (m *Plugin) Serve() chan error { } // Stop prometheus metrics service. -func (m *Plugin) Stop() error { - m.mu.Lock() - defer m.mu.Unlock() +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() - if m.http != nil { + if p.http != nil { // timeout is 10 seconds ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - err := m.http.Shutdown(ctx) + err := p.http.Shutdown(ctx) if err != nil { // Function should be Stop() error - m.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) + p.log.Error("stop error", "error", errors.Errorf("error shutting down the metrics server: error %v", err)) } } return nil } // Collects used to collect all plugins which implement metrics.StatProvider interface (and Named) -func (m *Plugin) Collects() []interface{} { +func (p *Plugin) Collects() []interface{} { return []interface{}{ - m.AddStatProvider, + p.AddStatProvider, } } // AddStatProvider adds a metrics provider -func (m *Plugin) AddStatProvider(stat StatProvider) error { - for _, c := range stat.MetricsCollector() { - err := m.registry.Register(c) - if err != nil { - return err - } - } +func (p *Plugin) AddStatProvider(stat StatProvider) error { + p.statProviders = append(p.statProviders, stat) return nil } // Name returns user friendly plugin name -func (m *Plugin) Name() string { +func (p *Plugin) Name() string { return PluginName } // RPC interface satisfaction -func (m *Plugin) RPC() interface{} { +func (p *Plugin) RPC() interface{} { return &rpcServer{ - svc: m, - log: m.log, + svc: p, + log: p.log, } } // Available interface implementation -func (m *Plugin) Available() { -} +func (p *Plugin) Available() {} diff --git a/tests/plugins/metrics/configs/.rr-http-metrics.yaml b/tests/plugins/metrics/configs/.rr-http-metrics.yaml new file mode 100644 index 00000000..95f131c0 --- /dev/null +++ b/tests/plugins/metrics/configs/.rr-http-metrics.yaml @@ -0,0 +1,20 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + relay: "pipes" + +http: + address: 127.0.0.1:13223 + max_request_size: 1024 + middleware: [ ] + pool: + num_workers: 1 + +metrics: + address: localhost:2112 + +logs: + mode: development + level: debug diff --git a/tests/plugins/metrics/metrics_test.go b/tests/plugins/metrics/metrics_test.go index 3e2023d4..84c60592 100644 --- a/tests/plugins/metrics/metrics_test.go +++ b/tests/plugins/metrics/metrics_test.go @@ -972,3 +972,98 @@ func declareMetricsTest(t *testing.T) { assert.NoError(t, err) assert.True(t, ret) } + +func TestHTTPMetrics(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + if err != nil { + t.Fatal(err) + } + + cfg := &config.Viper{} + cfg.Prefix = "rr" + cfg.Path = "configs/.rr-http-metrics.yaml" + + controller := gomock.NewController(t) + mockLogger := mocks.NewMockLogger(controller) + + mockLogger.EXPECT().Debug("worker destructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("worker constructed", "pid", gomock.Any()).AnyTimes() + mockLogger.EXPECT().Debug("200 GET http://localhost:13223/", "remote", gomock.Any(), "elapsed", gomock.Any()).MinTimes(1) + + err = cont.RegisterAll( + cfg, + &metrics.Plugin{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + mockLogger, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + tt := time.NewTimer(time.Minute * 3) + + go func() { + defer tt.Stop() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-tt.C: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 2) + t.Run("req1", echoHTTP) + t.Run("req2", echoHTTP) + + genericOut, err := get() + assert.NoError(t, err) + assert.Contains(t, genericOut, `rr_http_request_duration_seconds_bucket`) + assert.Contains(t, genericOut, `rr_http_request_duration_seconds_sum{status="200"}`) + assert.Contains(t, genericOut, `rr_http_request_duration_seconds_count{status="200"} 2`) + assert.Contains(t, genericOut, `rr_http_request_total{status="200"} 2`) + assert.Contains(t, genericOut, "rr_http_workers_memory_bytes") + + close(sig) +} + +func echoHTTP(t *testing.T) { + req, err := http.NewRequest("GET", "http://localhost:13223", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + _, err = ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + + err = r.Body.Close() + assert.NoError(t, err) +} |