diff options
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | service/headers/service_test.go | 288 | ||||
-rw-r--r-- | service/health/service.go | 25 | ||||
-rw-r--r-- | service/http/service_test.go | 607 | ||||
-rw-r--r-- | service/metrics/service.go | 28 |
6 files changed, 621 insertions, 332 deletions
@@ -6,6 +6,7 @@ require ( github.com/NYTimes/gziphandler v1.1.1 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 + github.com/cenkalti/backoff/v4 v4.0.0 github.com/dustin/go-humanize v1.0.0 github.com/go-ole/go-ole v1.2.4 // indirect github.com/json-iterator/go v1.1.9 @@ -27,6 +27,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37 h1:uxxtrnACqI9zK4ENDMf0WpXfUsHP5V8liuq5QdgDISU= github.com/buger/goterm v0.0.0-20181115115552-c206103e1f37/go.mod h1:u9UyCz2eTrSGy6fbupqJ54eY5c4IC8gREQ1053dK12U= +github.com/cenkalti/backoff v1.1.0 h1:QnvVp8ikKCDWOsFheytRCoYWYPO/ObCTBGxT19Hc+yE= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff/v4 v4.0.0 h1:6VeaLF9aI+MAUQ95106HwWzYZgJJpZ4stumjj6RFYAU= +github.com/cenkalti/backoff/v4 v4.0.0/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= diff --git a/service/headers/service_test.go b/service/headers/service_test.go index e4bbfb84..8d727c15 100644 --- a/service/headers/service_test.go +++ b/service/headers/service_test.go @@ -1,6 +1,7 @@ package headers import ( + "github.com/cenkalti/backoff/v4" json "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" @@ -35,16 +36,20 @@ func (cfg *testCfg) Unmarshal(out interface{}) error { } func Test_RequestHeaders(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - assert.NoError(t, c.Init(&testCfg{ - headers: `{"request":{"input": "custom-header"}}`, - httpCfg: `{ + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + headers: `{"request":{"input": "custom-header"}}`, + httpCfg: `{ "enable": true, "address": ":6078", "maxRequestSize": 1024, @@ -59,46 +64,62 @@ func Test_RequestHeaders(t *testing.T) { } }`})) - go func() { - err := c.Serve() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during Serve: error %v", err) + } + }() + + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6078?hello=value", nil) if err != nil { - t.Errorf("error during Serve: error %v", err) + return err } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - req, err := http.NewRequest("GET", "http://localhost:6078?hello=value", nil) - assert.NoError(t, err) + r, err := http.DefaultClient.Do(req) + if err != nil { + return err + } - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() + b, err := ioutil.ReadAll(r.Body) if err != nil { - t.Errorf("error during the body closing: error %v", err) + return err } - }() - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "CUSTOM-HEADER", string(b)) - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "CUSTOM-HEADER", string(b)) + err = r.Body.Close() + if err != nil { + return err + } + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } } func Test_ResponseHeaders(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{ - headers: `{"response":{"output": "output-header"},"request":{"input": "custom-header"}}`, - httpCfg: `{ + assert.NoError(t, c.Init(&testCfg{ + headers: `{"response":{"output": "output-header"},"request":{"input": "custom-header"}}`, + httpCfg: `{ "enable": true, "address": ":6079", "maxRequestSize": 1024, @@ -113,46 +134,61 @@ func Test_ResponseHeaders(t *testing.T) { } }`})) - go func() { - err := c.Serve() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6079?hello=value", nil) + if err != nil { + return err + } + + r, err := http.DefaultClient.Do(req) if err != nil { - t.Errorf("error during the Serve: error %v", err) + return err } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - req, err := http.NewRequest("GET", "http://localhost:6079?hello=value", nil) - assert.NoError(t, err) + assert.Equal(t, "output-header", r.Header.Get("output")) - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() + b, err := ioutil.ReadAll(r.Body) if err != nil { - t.Errorf("error during the body closing: error %v", err) + return err } - }() + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "CUSTOM-HEADER", string(b)) - assert.Equal(t, "output-header", r.Header.Get("output")) + err = r.Body.Close() + if err != nil { + return err + } - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + return nil + }, bkoff) - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "CUSTOM-HEADER", string(b)) + if err != nil { + t.Fatal(err) + } } func TestCORS_OPTIONS(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 + + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{ - headers: `{ + assert.NoError(t, c.Init(&testCfg{ + headers: `{ "cors":{ "allowedOrigin": "*", "allowedHeaders": "*", @@ -162,7 +198,7 @@ func TestCORS_OPTIONS(t *testing.T) { "maxAge": 600 } }`, - httpCfg: `{ + httpCfg: `{ "enable": true, "address": ":6379", "maxRequestSize": 1024, @@ -177,50 +213,65 @@ func TestCORS_OPTIONS(t *testing.T) { } }`})) - go func() { - err := c.Serve() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("OPTIONS", "http://localhost:6379", nil) if err != nil { - t.Errorf("error during the Serve: error %v", err) + return err } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - req, err := http.NewRequest("OPTIONS", "http://localhost:6379", nil) - assert.NoError(t, err) + r, err := http.DefaultClient.Do(req) + if err != nil { + return err + } - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() + assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) + assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) + assert.Equal(t, "GET,POST,PUT,DELETE", r.Header.Get("Access-Control-Allow-Methods")) + assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Origin")) + assert.Equal(t, "600", r.Header.Get("Access-Control-Max-Age")) + assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) + + _, err = ioutil.ReadAll(r.Body) if err != nil { - t.Errorf("error during the body closing: error %v", err) + return err } - }() + assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) - assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) - assert.Equal(t, "GET,POST,PUT,DELETE", r.Header.Get("Access-Control-Allow-Methods")) - assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Origin")) - assert.Equal(t, "600", r.Header.Get("Access-Control-Max-Age")) - assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) + err = r.Body.Close() + if err != nil { + return err + } - _, err = ioutil.ReadAll(r.Body) - assert.NoError(t, err) + return nil + }, bkoff) - assert.Equal(t, 200, r.StatusCode) + if err != nil { + t.Fatal(err) + } } func TestCORS_Pass(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(rrhttp.ID, &rrhttp.Service{}) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - assert.NoError(t, c.Init(&testCfg{ - headers: `{ + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + headers: `{ "cors":{ "allowedOrigin": "*", "allowedHeaders": "*", @@ -230,7 +281,7 @@ func TestCORS_Pass(t *testing.T) { "maxAge": 600 } }`, - httpCfg: `{ + httpCfg: `{ "enable": true, "address": ":6672", "maxRequestSize": 1024, @@ -245,32 +296,45 @@ func TestCORS_Pass(t *testing.T) { } }`})) - go func() { - err := c.Serve() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("error during the Serve: error %v", err) + } + }() + time.Sleep(time.Millisecond * 100) + defer c.Stop() + + req, err := http.NewRequest("GET", "http://localhost:6672", nil) + if err != nil { + return err + } + + r, err := http.DefaultClient.Do(req) if err != nil { - t.Errorf("error during the Serve: error %v", err) + return err } - }() - time.Sleep(time.Millisecond * 100) - defer c.Stop() - req, err := http.NewRequest("GET", "http://localhost:6672", nil) - assert.NoError(t, err) + assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) + assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) + assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Origin")) + assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) - assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Headers")) - assert.Equal(t, "*", r.Header.Get("Access-Control-Allow-Origin")) - assert.Equal(t, "true", r.Header.Get("Access-Control-Allow-Credentials")) + _, err = ioutil.ReadAll(r.Body) + if err != nil { + return err + } + assert.Equal(t, 200, r.StatusCode) - _, err = ioutil.ReadAll(r.Body) - assert.NoError(t, err) + err = r.Body.Close() + if err != nil { + return err + } - assert.Equal(t, 200, r.StatusCode) + return nil + }, bkoff) - err = r.Body.Close() if err != nil { - t.Errorf("error during the body closing: error %v", err) + t.Fatal(err) } } diff --git a/service/health/service.go b/service/health/service.go index c82f43b5..ce127340 100644 --- a/service/health/service.go +++ b/service/health/service.go @@ -6,12 +6,17 @@ import ( "github.com/sirupsen/logrus" "net/http" "sync" + "time" rrhttp "github.com/spiral/roadrunner/service/http" ) -// ID declares the public service name -const ID = "health" +const ( + // ID declares public service name. + ID = "health" + // maxHeaderSize declares max header size for prometheus server + maxHeaderSize = 1024 * 1024 * 100 // 104MB +) // Service to serve an endpoint for checking the health of the worker pool type Service struct { @@ -39,15 +44,23 @@ func (s *Service) Init(cfg *Config, r *rrhttp.Service, log *logrus.Logger) (bool func (s *Service) Serve() error { // Configure and start the http server s.mu.Lock() - s.http = &http.Server{Addr: s.cfg.Address, Handler: s} + s.http = &http.Server{ + Addr: s.cfg.Address, + Handler: s, + IdleTimeout: time.Hour * 24, + ReadTimeout: time.Minute * 60, + MaxHeaderBytes: maxHeaderSize, + ReadHeaderTimeout: time.Minute * 60, + WriteTimeout: time.Minute * 60, + } s.mu.Unlock() err := s.http.ListenAndServe() - if err == nil || err == http.ErrServerClosed { - return nil + if err != nil && err != http.ErrServerClosed { + return err } - return err + return nil } // Stop the health endpoint diff --git a/service/http/service_test.go b/service/http/service_test.go index 53dbb3df..f7ee33cc 100644 --- a/service/http/service_test.go +++ b/service/http/service_test.go @@ -1,6 +1,7 @@ package http import ( + "github.com/cenkalti/backoff/v4" json "github.com/json-iterator/go" "github.com/sirupsen/logrus" "github.com/sirupsen/logrus/hooks/test" @@ -77,13 +78,17 @@ func Test_Service_Configure_Disable(t *testing.T) { } func Test_Service_Configure_Enable(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":8070", "maxRequestSize": 1024, @@ -100,21 +105,36 @@ func Test_Service_Configure_Enable(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } + + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) } func Test_Service_Echo(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6536", "maxRequestSize": 1024, @@ -131,51 +151,74 @@ func Test_Service_Echo(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + // should do nothing + s.(*Service).Stop() + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() - // should do nothing - s.(*Service).Stop() + time.Sleep(time.Millisecond * 100) - go func() { - err := c.Serve() + req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) if err != nil { - t.Errorf("serve error: %v", err) + c.Stop() + return err } - }() - time.Sleep(time.Millisecond * 100) - req, err := http.NewRequest("GET", "http://localhost:6536?hello=world", nil) - assert.NoError(t, err) + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) + c.Stop() + return nil + }, bkoff) - err2 := r.Body.Close() - if err2 != nil { - t.Errorf("error closing the Body: error %v", err2) + if err != nil { + t.Fatal(err) } - c.Stop() } func Test_Service_Env(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(env.ID, env.NewService(map[string]string{"rr": "test"})) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + c := service.NewContainer(logger) + c.Register(env.ID, env.NewService(map[string]string{"rr": "test"})) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":10031", "maxRequestSize": 1024, @@ -192,53 +235,76 @@ func Test_Service_Env(t *testing.T) { "destroyTimeout": 10000000 } } - }`, envCfg: `{"env_key":"ENV_VALUE"}`})) + }`, envCfg: `{"env_key":"ENV_VALUE"}`}) + if err != nil { + return err + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) - // should do nothing - s.(*Service).Stop() + // should do nothing + s.(*Service).Stop() - go func() { - err := c.Serve() + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:10031", nil) if err != nil { - t.Errorf("serve error: %v", err) + c.Stop() + return err } - }() - time.Sleep(time.Second * 1) + r, err := http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } - req, err := http.NewRequest("GET", "http://localhost:10031", nil) - assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } + + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, "ENV_VALUE", string(b)) - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() + err = r.Body.Close() if err != nil { - t.Errorf("error closing the Body: error %v", err) + c.Stop() + return err } - }() - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + c.Stop() + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } - assert.NoError(t, err) - assert.Equal(t, 200, r.StatusCode) - assert.Equal(t, "ENV_VALUE", string(b)) - c.Stop() } func Test_Service_ErrorEcho(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6030", "maxRequestSize": 1024, @@ -255,61 +321,83 @@ func Test_Service_ErrorEcho(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) - goterr := make(chan interface{}) - s.(*Service).AddListener(func(event int, ctx interface{}) { - if event == roadrunner.EventStderrOutput { - if string(ctx.([]byte)) == "WORLD\n" { - goterr <- nil + goterr := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventStderrOutput { + if string(ctx.([]byte)) == "WORLD\n" { + goterr <- nil + } } + }) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) + if err != nil { + c.Stop() + return err } - }) - go func() { - err := c.Serve() + r, err := http.DefaultClient.Do(req) if err != nil { - t.Errorf("serve error: %v", err) + c.Stop() + return err } - }() - time.Sleep(time.Second) + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } - req, err := http.NewRequest("GET", "http://localhost:6030?hello=world", nil) - assert.NoError(t, err) + <-goterr - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) - defer func() { - err := r.Body.Close() + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) + err = r.Body.Close() if err != nil { - t.Errorf("error closing the Body: error %v", err) + c.Stop() + return err } - }() - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + c.Stop() - <-goterr + return nil + }, bkoff) - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) - c.Stop() + if err != nil { + t.Fatal(err) + } } func Test_Service_Middleware(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6032", "maxRequestSize": 1024, @@ -326,80 +414,112 @@ func Test_Service_Middleware(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) - - s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/halt" { - w.WriteHeader(500) - _, err := w.Write([]byte("halted")) - if err != nil { - t.Errorf("error writing the data to the http reply: error %v", err) + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) + + s.(*Service).AddMiddleware(func(f http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/halt" { + w.WriteHeader(500) + _, err := w.Write([]byte("halted")) + if err != nil { + t.Errorf("error writing the data to the http reply: error %v", err) + } + } else { + f(w, r) } - } else { - f(w, r) } + }) + + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + time.Sleep(time.Millisecond * 500) + + req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) + if err != nil { + c.Stop() + return err } - }) - go func() { - err := c.Serve() + r, err := http.DefaultClient.Do(req) if err != nil { - t.Errorf("serve error: %v", err) + c.Stop() + return err } - }() - time.Sleep(time.Second) - req, err := http.NewRequest("GET", "http://localhost:6032?hello=world", nil) - assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } - r, err := http.DefaultClient.Do(req) - assert.NoError(t, err) + assert.Equal(t, 201, r.StatusCode) + assert.Equal(t, "WORLD", string(b)) - b, err := ioutil.ReadAll(r.Body) - assert.NoError(t, err) + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } - assert.NoError(t, err) - assert.Equal(t, 201, r.StatusCode) - assert.Equal(t, "WORLD", string(b)) + req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) + if err != nil { + c.Stop() + return err + } - err = r.Body.Close() - if err != nil { - t.Errorf("error closing the Body: error %v", err) - } + r, err = http.DefaultClient.Do(req) + if err != nil { + c.Stop() + return err + } + b, err = ioutil.ReadAll(r.Body) + if err != nil { + c.Stop() + return err + } - req, err = http.NewRequest("GET", "http://localhost:6032/halt", nil) - assert.NoError(t, err) + assert.Equal(t, 500, r.StatusCode) + assert.Equal(t, "halted", string(b)) - r, err = http.DefaultClient.Do(req) - assert.NoError(t, err) - b, err = ioutil.ReadAll(r.Body) - assert.NoError(t, err) + err = r.Body.Close() + if err != nil { + c.Stop() + return err + } + c.Stop() - assert.NoError(t, err) - assert.Equal(t, 500, r.StatusCode) - assert.Equal(t, "halted", string(b)) + return nil + }, bkoff) - err = r.Body.Close() if err != nil { - c.Stop() - t.Errorf("error closing the Body: error %v", err) + t.Fatal(err) } - c.Stop() + } func Test_Service_Listener(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6033", "maxRequestSize": 1024, @@ -416,39 +536,53 @@ func Test_Service_Listener(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } - s, st := c.Get(ID) - assert.NotNil(t, s) - assert.Equal(t, service.StatusOK, st) + s, st := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, st) - stop := make(chan interface{}) - s.(*Service).AddListener(func(event int, ctx interface{}) { - if event == roadrunner.EventServerStart { - stop <- nil - } - }) + stop := make(chan interface{}) + s.(*Service).AddListener(func(event int, ctx interface{}) { + if event == roadrunner.EventServerStart { + stop <- nil + } + }) - go func() { - err := c.Serve() - if err != nil { - t.Errorf("serve error: %v", err) - } - }() - time.Sleep(time.Millisecond * 100) + go func() { + err := c.Serve() + if err != nil { + t.Errorf("serve error: %v", err) + } + }() + time.Sleep(time.Millisecond * 500) - c.Stop() - assert.True(t, true) + c.Stop() + assert.True(t, true) + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } } func Test_Service_Error(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6034", "maxRequestSize": 1024, @@ -465,19 +599,37 @@ func Test_Service_Error(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } + + // assert error + err = c.Serve() + if err == nil { + return err + } + + return nil + }, bkoff) - assert.Error(t, c.Serve()) + if err != nil { + t.Fatal(err) + } } func Test_Service_Error2(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) - assert.NoError(t, c.Init(&testCfg{httpCfg: `{ + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6035", "maxRequestSize": 1024, @@ -494,19 +646,37 @@ func Test_Service_Error2(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + if err != nil { + return err + } + + // assert error + err = c.Serve() + if err == nil { + return err + } - assert.Error(t, c.Serve()) + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } } func Test_Service_Error3(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.Error(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": ":6036", "maxRequestSize": 1024, @@ -523,17 +693,33 @@ func Test_Service_Error3(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + // assert error + if err == nil { + return err + } + + return nil + }, bkoff) + + if err != nil { + t.Fatal(err) + } + } func Test_Service_Error4(t *testing.T) { - logger, _ := test.NewNullLogger() - logger.SetLevel(logrus.DebugLevel) + bkoff := backoff.NewExponentialBackOff() + bkoff.MaxElapsedTime = time.Second * 15 - c := service.NewContainer(logger) - c.Register(ID, &Service{}) + err := backoff.Retry(func() error { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) - assert.Error(t, c.Init(&testCfg{httpCfg: `{ + err := c.Init(&testCfg{httpCfg: `{ "enable": true, "address": "----", "maxRequestSize": 1024, @@ -550,7 +736,18 @@ func Test_Service_Error4(t *testing.T) { "destroyTimeout": 10000000 } } - }`})) + }`}) + // assert error + if err != nil { + return nil + } + + return err + }, bkoff) + + if err != nil { + t.Fatal(err) + } } func tmpDir() string { diff --git a/service/metrics/service.go b/service/metrics/service.go index 6fa4da50..0b667f2d 100644 --- a/service/metrics/service.go +++ b/service/metrics/service.go @@ -11,10 +11,15 @@ import ( "github.com/spiral/roadrunner/service/rpc" "net/http" "sync" + "time" ) -// ID declares public service name. -const ID = "metrics" +const ( + // ID declares public service name. + ID = "metrics" + // maxHeaderSize declares max header size for prometheus server + maxHeaderSize = 1024 * 1024 * 100 // 104MB +) // Service to manage application metrics using Prometheus. type Service struct { @@ -76,18 +81,23 @@ func (s *Service) Serve() error { } s.mu.Lock() - s.http = &http.Server{Addr: s.cfg.Address, Handler: promhttp.HandlerFor( - s.registry, - promhttp.HandlerOpts{}, - )} + s.http = &http.Server{ + Addr: s.cfg.Address, + Handler: promhttp.HandlerFor(s.registry, promhttp.HandlerOpts{}, ), + IdleTimeout: time.Hour * 24, + ReadTimeout: time.Minute * 60, + MaxHeaderBytes: maxHeaderSize, + ReadHeaderTimeout: time.Minute * 60, + WriteTimeout: time.Minute * 60, + } s.mu.Unlock() err = s.http.ListenAndServe() - if err == nil || err == http.ErrServerClosed { - return nil + if err != nil && err != http.ErrServerClosed { + return err } - return err + return nil } // Stop prometheus metrics service. |