diff options
author | Alex Price <[email protected]> | 2019-10-12 15:02:57 +1100 |
---|---|---|
committer | Alex Price <[email protected]> | 2019-10-12 15:38:40 +1100 |
commit | c9fdfb6e9484b9aa45e20a90e78a23d5d129308c (patch) | |
tree | 61117f89311b7b8ec2cc680128ecdd1d27d6d94e /service/health | |
parent | 8a840c40828c1fb31c69fc846a85738ddef0a7c7 (diff) |
adds a health service for determining the status of the workers
This commit adds a health service which ensures that at least
one worker is active. Uses `isActive()` to determine if the
worker is ready. The health service runs on a seperate address.
Will return a HTTP 200 if health, HTTP 500 otherwise.
Fixes #192
Signed-off-by: Alex Price <[email protected]>
Diffstat (limited to 'service/health')
-rw-r--r-- | service/health/config.go | 32 | ||||
-rw-r--r-- | service/health/config_test.go | 43 | ||||
-rw-r--r-- | service/health/service.go | 88 | ||||
-rw-r--r-- | service/health/service_test.go | 279 |
4 files changed, 442 insertions, 0 deletions
diff --git a/service/health/config.go b/service/health/config.go new file mode 100644 index 00000000..60a52d6e --- /dev/null +++ b/service/health/config.go @@ -0,0 +1,32 @@ +package health + +import ( + "errors" + "strings" + + "github.com/spiral/roadrunner/service" +) + +// Config configures the health service +type Config struct { + // Address to listen on + Address string +} + +// Hydrate the config +func (c *Config) Hydrate(cfg service.Config) error { + if err := cfg.Unmarshal(c); err != nil { + return err + } + return c.Valid() +} + +// Valid validates the configuration. +func (c *Config) Valid() error { + // Validate the address + if c.Address != "" && !strings.Contains(c.Address, ":") { + return errors.New("malformed http server address") + } + + return nil +} diff --git a/service/health/config_test.go b/service/health/config_test.go new file mode 100644 index 00000000..9068f2ca --- /dev/null +++ b/service/health/config_test.go @@ -0,0 +1,43 @@ +package health + +import ( + "encoding/json" + "testing" + + "github.com/spiral/roadrunner/service" + "github.com/stretchr/testify/assert" +) + +type mockCfg struct{ cfg string } + +func (cfg *mockCfg) Get(name string) service.Config { return nil } +func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) } + +func Test_Config_Hydrate_Error1(t *testing.T) { + cfg := &mockCfg{`{"address": "localhost:8080"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) + assert.Equal(t, "localhost:8080", c.Address) +} + +func Test_Config_Hydrate_Error2(t *testing.T) { + cfg := &mockCfg{`{"dir": "/dir/"`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Valid1(t *testing.T) { + cfg := &mockCfg{`{"address": "localhost"}`} + c := &Config{} + + assert.Error(t, c.Hydrate(cfg)) +} + +func Test_Config_Hydrate_Valid2(t *testing.T) { + cfg := &mockCfg{`{"address": ":1111"}`} + c := &Config{} + + assert.NoError(t, c.Hydrate(cfg)) +} diff --git a/service/health/service.go b/service/health/service.go new file mode 100644 index 00000000..c0be68e0 --- /dev/null +++ b/service/health/service.go @@ -0,0 +1,88 @@ +package health + +import ( + "context" + "net/http" + "sync" + + rrhttp "github.com/spiral/roadrunner/service/http" +) + +// ID declares the public service name +const ID = "health" + +// Service to serve an endpoint for checking the health of the worker pool +type Service struct { + cfg *Config + mu sync.Mutex + http *http.Server + httpService *rrhttp.Service +} + +// Init health service +func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) { + // Ensure the httpService is set + if r == nil { + return false, nil + } + + s.cfg = cfg + s.httpService = r + return true, nil +} + +// Serve the health endpoint +func (s *Service) Serve() error { + // Configure and start the http server + s.mu.Lock() + s.http = &http.Server{Addr: s.cfg.Address, Handler: s} + s.mu.Unlock() + return s.http.ListenAndServe() +} + +// Stop the health endpoint +func (s *Service) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.http != nil { + // gracefully stop the server + go s.http.Shutdown(context.Background()) + } +} + +// ServeHTTP returns the health of the pool of workers +func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { + status := http.StatusOK + if !s.isHealthy() { + status = http.StatusInternalServerError + } + w.WriteHeader(status) +} + +// isHealthy checks the server, pool and ensures at least one worker is active +func (s *Service) isHealthy() bool { + httpService := s.httpService + if httpService == nil { + return false + } + + server := httpService.Server() + if server == nil { + return false + } + + pool := server.Pool() + if pool == nil { + return false + } + + // Ensure at least one worker is active + for _, w := range pool.Workers() { + if w.State().IsActive() { + return true + } + } + + return false +} diff --git a/service/health/service_test.go b/service/health/service_test.go new file mode 100644 index 00000000..3e86b0bd --- /dev/null +++ b/service/health/service_test.go @@ -0,0 +1,279 @@ +package health + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus/hooks/test" + "github.com/spiral/roadrunner/service" + rrhttp "github.com/spiral/roadrunner/service/http" + "github.com/stretchr/testify/assert" +) + +type testCfg struct { + healthCfg string + httpCfg string + target string +} + +func (cfg *testCfg) Get(name string) service.Config { + if name == ID { + return &testCfg{target: cfg.healthCfg} + } + + if name == rrhttp.ID { + return &testCfg{target: cfg.httpCfg} + } + + return nil +} + +func (cfg *testCfg) Unmarshal(out interface{}) error { + err := json.Unmarshal([]byte(cfg.target), out) + return err +} + +func TestService_Serve(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) + +} + +func TestService_Serve_DeadWorker(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Kill the worker + httpSvc := hS.(*rrhttp.Service) + httpSvc.Server().Workers()[0].Kill() + + // Check health check + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 3} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Second * 1) + defer c.Stop() + + // Kill one of the workers + httpSvc := hS.(*rrhttp.Service) + httpSvc.Server().Workers()[0].Kill() + + // Check health check + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, res.StatusCode) +} + +func TestService_Serve_NoHTTPService(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + c := service.NewContainer(logger) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusInactive, status) +} + +func TestService_Serve_NoServer(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + healthSvc := &Service{} + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, &rrhttp.Service{}) + c.Register(ID, healthSvc) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Set the httpService to nil + healthSvc.httpService = nil + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +func TestService_Serve_NoPool(t *testing.T) { + logger, _ := test.NewNullLogger() + logger.SetLevel(logrus.DebugLevel) + + httpSvc := &rrhttp.Service{} + + c := service.NewContainer(logger) + c.Register(rrhttp.ID, httpSvc) + c.Register(ID, &Service{}) + + assert.NoError(t, c.Init(&testCfg{ + healthCfg: `{ + "address": "localhost:2116" + }`, + httpCfg: `{ + "address": "localhost:2115", + "workers":{ + "command": "php ../../tests/http/client.php echo pipes", + "pool": {"numWorkers": 1} + } + }`, + })) + + s, status := c.Get(ID) + assert.NotNil(t, s) + assert.Equal(t, service.StatusOK, status) + + hS, httpStatus := c.Get(rrhttp.ID) + assert.NotNil(t, hS) + assert.Equal(t, service.StatusOK, httpStatus) + + go func() { c.Serve() }() + time.Sleep(time.Millisecond * 500) + defer c.Stop() + + // Stop the pool + httpSvc.Server().Stop() + + _, res, err := get("http://localhost:2116/") + assert.NoError(t, err) + assert.Equal(t, http.StatusInternalServerError, res.StatusCode) +} + +// get request and return body +func get(url string) (string, *http.Response, error) { + r, err := http.Get(url) + if err != nil { + return "", nil, err + } + defer r.Body.Close() + + b, err := ioutil.ReadAll(r.Body) + return string(b), r, err +} |