diff options
author | Alex Price <[email protected]> | 2019-10-12 15:02:57 +1100 |
---|---|---|
committer | Alex Price <[email protected]> | 2019-10-12 15:38:40 +1100 |
commit | c9fdfb6e9484b9aa45e20a90e78a23d5d129308c (patch) | |
tree | 61117f89311b7b8ec2cc680128ecdd1d27d6d94e | |
parent | 8a840c40828c1fb31c69fc846a85738ddef0a7c7 (diff) |
adds a health service for determining the status of the workers
This commit adds a health service which ensures that at least
one worker is active. Uses `isActive()` to determine if the
worker is ready. The health service runs on a seperate address.
Will return a HTTP 200 if health, HTTP 500 otherwise.
Fixes #192
Signed-off-by: Alex Price <[email protected]>
-rw-r--r-- | .rr.yaml | 5 | ||||
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | cmd/rr/main.go | 2 | ||||
-rw-r--r-- | service/health/config.go | 32 | ||||
-rw-r--r-- | service/health/config_test.go | 43 | ||||
-rw-r--r-- | service/health/service.go | 88 | ||||
-rw-r--r-- | service/health/service_test.go | 279 | ||||
-rw-r--r-- | state.go | 3 |
8 files changed, 454 insertions, 0 deletions
@@ -155,3 +155,8 @@ static: # list of extensions for forbid for serving. forbid: [".php", ".htaccess"] + +# health service configuration +health: + # http host to serve health requests. + address: localhost:2113
\ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 9ae4200a..c061338c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,6 +26,7 @@ script: - go test ./service/limit -race -v -coverprofile=limit.txt -covermode=atomic - go test ./service/headers -race -v -coverprofile=headers.txt -covermode=atomic - go test ./service/metrics -race -v -coverprofile=metrics.txt -covermode=atomic + - go test ./service/health -race -v -coverprofile=health.txt -covermode=atomic after_success: - bash <(curl -s https://codecov.io/bash) -f lib.txt @@ -38,6 +39,7 @@ after_success: - bash <(curl -s https://codecov.io/bash) -f limit.txt - bash <(curl -s https://codecov.io/bash) -f headers.txt - bash <(curl -s https://codecov.io/bash) -f metrics.txt + - bash <(curl -s https://codecov.io/bash) -f health.txt jobs: include: diff --git a/cmd/rr/main.go b/cmd/rr/main.go index 6fb10ba6..fc02c4d4 100644 --- a/cmd/rr/main.go +++ b/cmd/rr/main.go @@ -28,6 +28,7 @@ import ( // services (plugins) "github.com/spiral/roadrunner/service/env" "github.com/spiral/roadrunner/service/headers" + "github.com/spiral/roadrunner/service/health" "github.com/spiral/roadrunner/service/http" "github.com/spiral/roadrunner/service/limit" "github.com/spiral/roadrunner/service/metrics" @@ -47,6 +48,7 @@ func main() { rr.Container.Register(headers.ID, &headers.Service{}) rr.Container.Register(static.ID, &static.Service{}) rr.Container.Register(limit.ID, &limit.Service{}) + rr.Container.Register(health.ID, &health.Service{}) // you can register additional commands using cmd.CLI rr.Execute() diff --git a/service/health/config.go b/service/health/config.go new file mode 100644 index 00000000..60a52d6e --- /dev/null +++ b/service/health/config.go @@ -0,0 +1,32 @@ +package health + +import ( + "errors" + "strings" + + "github.com/spiral/roadrunner/service" +) + +// Config configures the health service +type Config struct { + // Address to listen on + Address string +} + +// Hydrate the config +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + return c.Valid() +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + // Validate the address + if c.Address != "" && !strings.Contains(c.Address, ":") { + return errors.New("malformed http server address") + } + + return nil +} diff --git a/service/health/config_test.go b/service/health/config_test.go new file mode 100644 index 00000000..9068f2ca --- /dev/null +++ b/service/health/config_test.go @@ -0,0 +1,43 @@ +package health + +import ( + "encoding/json" + "testing" + + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"address": "localhost:8080"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + assert.Equal(t, "localhost:8080", c.Address) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Valid1(t *testing.T) { + cfg := &mockCfg{`{"address": "localhost"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Valid2(t *testing.T) { + cfg := &mockCfg{`{"address": ":1111"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/service/health/service.go b/service/health/service.go new file mode 100644 index 00000000..c0be68e0 --- /dev/null +++ b/service/health/service.go @@ -0,0 +1,88 @@ +package health + +import ( + "context" + "net/http" + "sync" + + rrhttp "github.com/spiral/roadrunner/service/http" +) + +// ID declares the public service name +const ID = "health" + +// Service to serve an endpoint for checking the health of the worker pool +type Service struct { + cfg *Config + mu sync.Mutex + http *http.Server + httpService *rrhttp.Service +} + +// Init health service +func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) { + // Ensure the httpService is set + if r == nil { + return false, nil + } + + s.cfg = cfg + s.httpService = r + return true, nil +} + +// Serve the health endpoint +func (s *Service) Serve() error { + // Configure and start the http server + s.mu.Lock() + s.http = &http.Server{Addr: s.cfg.Address, Handler: s} + s.mu.Unlock() + return s.http.ListenAndServe() +} + +// Stop the health endpoint +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.http != nil { + // gracefully stop the server + go s.http.Shutdown(context.Background()) + } +} + +// ServeHTTP returns the health of the pool of workers +func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + status := http.StatusOK + if !s.isHealthy() { + status = http.StatusInternalServerError + } + w.WriteHeader(status) +} + +// isHealthy checks the server, pool and ensures at least one worker is active +func (s *Service) isHealthy() bool { + httpService := s.httpService + if httpService == nil { + return false + } + + server := httpService.Server() + if server == nil { + return false + } + + pool := server.Pool() + if pool == nil { + return false + } + + // Ensure at least one worker is active + for _, w := range pool.Workers() { + if w.State().IsActive() { + return true + } + } + + return false +} diff --git a/service/health/service_test.go b/service/health/service_test.go new file mode 100644 index 00000000..3e86b0bd --- /dev/null +++ b/service/health/service_test.go @@ -0,0 +1,279 @@ +package health + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + rrhttp "github.com/spiral/roadrunner/service/http" + "github.com/stretchr/testify/assert" +) + +type testCfg struct { + healthCfg string + httpCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.healthCfg} + } + + if name == rrhttp.ID { + return &testCfg{target: cfg.httpCfg} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + err := json.Unmarshal([]byte(cfg.target), out) + return err +} + +func TestService_Serve(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + +} + +func TestService_Serve_DeadWorker(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Kill the worker + httpSvc := hS.(*rrhttp.Service) + httpSvc.Server().Workers()[0].Kill() + + // Check health check + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 3} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Second * 1) + defer c.Stop() + + // Kill one of the workers + httpSvc := hS.(*rrhttp.Service) + httpSvc.Server().Workers()[0].Kill() + + // Check health check + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) +} + +func TestService_Serve_NoHTTPService(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusInactive, status) +} + +func TestService_Serve_NoServer(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + healthSvc := &Service{} + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, healthSvc) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Set the httpService to nil + healthSvc.httpService = nil + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +func TestService_Serve_NoPool(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + httpSvc := &rrhttp.Service{} + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, httpSvc) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Stop the pool + httpSvc.Server().Stop() + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} @@ -14,6 +14,9 @@ type State interface { // NumJobs shows how many times worker was invoked NumExecs() int64 + + // IsActive returns true if worker not Inactive or Stopped + IsActive() bool } const ( |