summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Price <[email protected]>2019-10-12 15:02:57 +1100
committerAlex Price <[email protected]>2019-10-12 15:38:40 +1100
commitc9fdfb6e9484b9aa45e20a90e78a23d5d129308c (patch)
tree61117f89311b7b8ec2cc680128ecdd1d27d6d94e
parent8a840c40828c1fb31c69fc846a85738ddef0a7c7 (diff)
adds a health service for determining the status of the workers
This commit adds a health service which ensures that at least one worker is active. Uses `isActive()` to determine if the worker is ready. The health service runs on a seperate address. Will return a HTTP 200 if health, HTTP 500 otherwise. Fixes #192 Signed-off-by: Alex Price <[email protected]>
-rw-r--r--.rr.yaml5
-rw-r--r--.travis.yml2
-rw-r--r--cmd/rr/main.go2
-rw-r--r--service/health/config.go32
-rw-r--r--service/health/config_test.go43
-rw-r--r--service/health/service.go88
-rw-r--r--service/health/service_test.go279
-rw-r--r--state.go3
8 files changed, 454 insertions, 0 deletions
diff --git a/.rr.yaml b/.rr.yaml
index 151ed914..5792b881 100644
--- a/.rr.yaml
+++ b/.rr.yaml
@@ -155,3 +155,8 @@ static:
# list of extensions for forbid for serving.
forbid: [".php", ".htaccess"]
+
+# health service configuration
+health:
+ # http host to serve health requests.
+ address: localhost:2113 \ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 9ae4200a..c061338c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -26,6 +26,7 @@ script:
- go test ./service/limit -race -v -coverprofile=limit.txt -covermode=atomic
- go test ./service/headers -race -v -coverprofile=headers.txt -covermode=atomic
- go test ./service/metrics -race -v -coverprofile=metrics.txt -covermode=atomic
+ - go test ./service/health -race -v -coverprofile=health.txt -covermode=atomic
after_success:
- bash <(curl -s https://codecov.io/bash) -f lib.txt
@@ -38,6 +39,7 @@ after_success:
- bash <(curl -s https://codecov.io/bash) -f limit.txt
- bash <(curl -s https://codecov.io/bash) -f headers.txt
- bash <(curl -s https://codecov.io/bash) -f metrics.txt
+ - bash <(curl -s https://codecov.io/bash) -f health.txt
jobs:
include:
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 6fb10ba6..fc02c4d4 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -28,6 +28,7 @@ import (
// services (plugins)
"github.com/spiral/roadrunner/service/env"
"github.com/spiral/roadrunner/service/headers"
+ "github.com/spiral/roadrunner/service/health"
"github.com/spiral/roadrunner/service/http"
"github.com/spiral/roadrunner/service/limit"
"github.com/spiral/roadrunner/service/metrics"
@@ -47,6 +48,7 @@ func main() {
rr.Container.Register(headers.ID, &headers.Service{})
rr.Container.Register(static.ID, &static.Service{})
rr.Container.Register(limit.ID, &limit.Service{})
+ rr.Container.Register(health.ID, &health.Service{})
// you can register additional commands using cmd.CLI
rr.Execute()
diff --git a/service/health/config.go b/service/health/config.go
new file mode 100644
index 00000000..60a52d6e
--- /dev/null
+++ b/service/health/config.go
@@ -0,0 +1,32 @@
+package health
+
+import (
+ "errors"
+ "strings"
+
+ "github.com/spiral/roadrunner/service"
+)
+
+// Config configures the health service
+type Config struct {
+ // Address to listen on
+ Address string
+}
+
+// Hydrate the config
+func (c *Config) Hydrate(cfg service.Config) error {
+ if err := cfg.Unmarshal(c); err != nil {
+ return err
+ }
+ return c.Valid()
+}
+
+// Valid validates the configuration.
+func (c *Config) Valid() error {
+ // Validate the address
+ if c.Address != "" && !strings.Contains(c.Address, ":") {
+ return errors.New("malformed http server address")
+ }
+
+ return nil
+}
diff --git a/service/health/config_test.go b/service/health/config_test.go
new file mode 100644
index 00000000..9068f2ca
--- /dev/null
+++ b/service/health/config_test.go
@@ -0,0 +1,43 @@
+package health
+
+import (
+ "encoding/json"
+ "testing"
+
+ "github.com/spiral/roadrunner/service"
+ "github.com/stretchr/testify/assert"
+)
+
+type mockCfg struct{ cfg string }
+
+func (cfg *mockCfg) Get(name string) service.Config { return nil }
+func (cfg *mockCfg) Unmarshal(out interface{}) error { return json.Unmarshal([]byte(cfg.cfg), out) }
+
+func Test_Config_Hydrate_Error1(t *testing.T) {
+ cfg := &mockCfg{`{"address": "localhost:8080"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+ assert.Equal(t, "localhost:8080", c.Address)
+}
+
+func Test_Config_Hydrate_Error2(t *testing.T) {
+ cfg := &mockCfg{`{"dir": "/dir/"`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Valid1(t *testing.T) {
+ cfg := &mockCfg{`{"address": "localhost"}`}
+ c := &Config{}
+
+ assert.Error(t, c.Hydrate(cfg))
+}
+
+func Test_Config_Hydrate_Valid2(t *testing.T) {
+ cfg := &mockCfg{`{"address": ":1111"}`}
+ c := &Config{}
+
+ assert.NoError(t, c.Hydrate(cfg))
+}
diff --git a/service/health/service.go b/service/health/service.go
new file mode 100644
index 00000000..c0be68e0
--- /dev/null
+++ b/service/health/service.go
@@ -0,0 +1,88 @@
+package health
+
+import (
+ "context"
+ "net/http"
+ "sync"
+
+ rrhttp "github.com/spiral/roadrunner/service/http"
+)
+
+// ID declares the public service name
+const ID = "health"
+
+// Service to serve an endpoint for checking the health of the worker pool
+type Service struct {
+ cfg *Config
+ mu sync.Mutex
+ http *http.Server
+ httpService *rrhttp.Service
+}
+
+// Init health service
+func (s *Service) Init(cfg *Config, r *rrhttp.Service) (bool, error) {
+ // Ensure the httpService is set
+ if r == nil {
+ return false, nil
+ }
+
+ s.cfg = cfg
+ s.httpService = r
+ return true, nil
+}
+
+// Serve the health endpoint
+func (s *Service) Serve() error {
+ // Configure and start the http server
+ s.mu.Lock()
+ s.http = &http.Server{Addr: s.cfg.Address, Handler: s}
+ s.mu.Unlock()
+ return s.http.ListenAndServe()
+}
+
+// Stop the health endpoint
+func (s *Service) Stop() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.http != nil {
+ // gracefully stop the server
+ go s.http.Shutdown(context.Background())
+ }
+}
+
+// ServeHTTP returns the health of the pool of workers
+func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ status := http.StatusOK
+ if !s.isHealthy() {
+ status = http.StatusInternalServerError
+ }
+ w.WriteHeader(status)
+}
+
+// isHealthy checks the server, pool and ensures at least one worker is active
+func (s *Service) isHealthy() bool {
+ httpService := s.httpService
+ if httpService == nil {
+ return false
+ }
+
+ server := httpService.Server()
+ if server == nil {
+ return false
+ }
+
+ pool := server.Pool()
+ if pool == nil {
+ return false
+ }
+
+ // Ensure at least one worker is active
+ for _, w := range pool.Workers() {
+ if w.State().IsActive() {
+ return true
+ }
+ }
+
+ return false
+}
diff --git a/service/health/service_test.go b/service/health/service_test.go
new file mode 100644
index 00000000..3e86b0bd
--- /dev/null
+++ b/service/health/service_test.go
@@ -0,0 +1,279 @@
+package health
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus/hooks/test"
+ "github.com/spiral/roadrunner/service"
+ rrhttp "github.com/spiral/roadrunner/service/http"
+ "github.com/stretchr/testify/assert"
+)
+
+type testCfg struct {
+ healthCfg string
+ httpCfg string
+ target string
+}
+
+func (cfg *testCfg) Get(name string) service.Config {
+ if name == ID {
+ return &testCfg{target: cfg.healthCfg}
+ }
+
+ if name == rrhttp.ID {
+ return &testCfg{target: cfg.httpCfg}
+ }
+
+ return nil
+}
+
+func (cfg *testCfg) Unmarshal(out interface{}) error {
+ err := json.Unmarshal([]byte(cfg.target), out)
+ return err
+}
+
+func TestService_Serve(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ httpCfg: `{
+ "address": "localhost:2115",
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, status)
+
+ hS, httpStatus := c.Get(rrhttp.ID)
+ assert.NotNil(t, hS)
+ assert.Equal(t, service.StatusOK, httpStatus)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 500)
+ defer c.Stop()
+
+ _, res, err := get("http://localhost:2116/")
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, res.StatusCode)
+
+}
+
+func TestService_Serve_DeadWorker(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ httpCfg: `{
+ "address": "localhost:2115",
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, status)
+
+ hS, httpStatus := c.Get(rrhttp.ID)
+ assert.NotNil(t, hS)
+ assert.Equal(t, service.StatusOK, httpStatus)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 500)
+ defer c.Stop()
+
+ // Kill the worker
+ httpSvc := hS.(*rrhttp.Service)
+ httpSvc.Server().Workers()[0].Kill()
+
+ // Check health check
+ _, res, err := get("http://localhost:2116/")
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusInternalServerError, res.StatusCode)
+}
+
+func TestService_Serve_DeadWorkerStillHealthy(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ httpCfg: `{
+ "address": "localhost:2115",
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 3}
+ }
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, status)
+
+ hS, httpStatus := c.Get(rrhttp.ID)
+ assert.NotNil(t, hS)
+ assert.Equal(t, service.StatusOK, httpStatus)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Second * 1)
+ defer c.Stop()
+
+ // Kill one of the workers
+ httpSvc := hS.(*rrhttp.Service)
+ httpSvc.Server().Workers()[0].Kill()
+
+ // Check health check
+ _, res, err := get("http://localhost:2116/")
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, res.StatusCode)
+}
+
+func TestService_Serve_NoHTTPService(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ c := service.NewContainer(logger)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusInactive, status)
+}
+
+func TestService_Serve_NoServer(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ healthSvc := &Service{}
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, &rrhttp.Service{})
+ c.Register(ID, healthSvc)
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ httpCfg: `{
+ "address": "localhost:2115",
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, status)
+
+ hS, httpStatus := c.Get(rrhttp.ID)
+ assert.NotNil(t, hS)
+ assert.Equal(t, service.StatusOK, httpStatus)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 500)
+ defer c.Stop()
+
+ // Set the httpService to nil
+ healthSvc.httpService = nil
+
+ _, res, err := get("http://localhost:2116/")
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusInternalServerError, res.StatusCode)
+}
+
+func TestService_Serve_NoPool(t *testing.T) {
+ logger, _ := test.NewNullLogger()
+ logger.SetLevel(logrus.DebugLevel)
+
+ httpSvc := &rrhttp.Service{}
+
+ c := service.NewContainer(logger)
+ c.Register(rrhttp.ID, httpSvc)
+ c.Register(ID, &Service{})
+
+ assert.NoError(t, c.Init(&testCfg{
+ healthCfg: `{
+ "address": "localhost:2116"
+ }`,
+ httpCfg: `{
+ "address": "localhost:2115",
+ "workers":{
+ "command": "php ../../tests/http/client.php echo pipes",
+ "pool": {"numWorkers": 1}
+ }
+ }`,
+ }))
+
+ s, status := c.Get(ID)
+ assert.NotNil(t, s)
+ assert.Equal(t, service.StatusOK, status)
+
+ hS, httpStatus := c.Get(rrhttp.ID)
+ assert.NotNil(t, hS)
+ assert.Equal(t, service.StatusOK, httpStatus)
+
+ go func() { c.Serve() }()
+ time.Sleep(time.Millisecond * 500)
+ defer c.Stop()
+
+ // Stop the pool
+ httpSvc.Server().Stop()
+
+ _, res, err := get("http://localhost:2116/")
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusInternalServerError, res.StatusCode)
+}
+
+// get request and return body
+func get(url string) (string, *http.Response, error) {
+ r, err := http.Get(url)
+ if err != nil {
+ return "", nil, err
+ }
+ defer r.Body.Close()
+
+ b, err := ioutil.ReadAll(r.Body)
+ return string(b), r, err
+}
diff --git a/state.go b/state.go
index bf88f012..98451f48 100644
--- a/state.go
+++ b/state.go
@@ -14,6 +14,9 @@ type State interface {
// NumJobs shows how many times worker was invoked
NumExecs() int64
+
+ // IsActive returns true if worker not Inactive or Stopped
+ IsActive() bool
}
const (