diff options
-rw-r--r-- | CHANGELOG.md | 3 | ||||
-rw-r--r-- | go.mod | 6 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory.go | 2 | ||||
-rw-r--r-- | plugins/http/plugin.go | 20 | ||||
-rw-r--r-- | plugins/status/interface.go | 7 | ||||
-rw-r--r-- | plugins/status/plugin.go | 92 | ||||
-rw-r--r-- | plugins/status/rpc.go | 20 | ||||
-rwxr-xr-x | tests/plugins/status/configs/.rr-ready-init.yaml | 28 | ||||
-rw-r--r-- | tests/plugins/status/plugin_test.go | 198 |
9 files changed, 346 insertions, 30 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index f014a692..83d628e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ v2.0.4 (06.04.2021) - 🐛 Fix: bug with the temporal worker which does not follow general graceful shutdown period. +## 🔥 New: +- 🆕 Add Readiness probe check. The `status` plugin provides `/ready` endpoint which return the `204` HTTP code if there are no workers in the `Ready` state and `200 OK` status if there are at least 1 worker in the `Ready` state. + v2.0.3 (29.03.2021) ------------------- @@ -21,7 +21,7 @@ require ( github.com/shirou/gopsutil v3.21.3+incompatible github.com/spf13/viper v1.7.1 // SPIRAL ==== - github.com/spiral/endure v1.0.0 + github.com/spiral/endure v1.0.1 github.com/spiral/errors v1.0.9 github.com/spiral/goridge/v3 v3.0.1 // =========== @@ -36,3 +36,7 @@ require ( golang.org/x/sync v0.0.0-20201207232520-09787c993a3a golang.org/x/sys v0.0.0-20210309074719-68d13333faf2 ) + +replace ( + github.com/spiral/roadrunner/v2 => ../roadrunner +) diff --git a/pkg/transport/socket/socket_factory.go b/pkg/transport/socket/socket_factory.go index 990eb384..ee63c9c3 100755 --- a/pkg/transport/socket/socket_factory.go +++ b/pkg/transport/socket/socket_factory.go @@ -33,8 +33,6 @@ type Factory struct { ErrCh chan error } -// todo: review - // NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection func NewSocketServer(ls net.Listener, tout time.Duration) *Factory { diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 82cf76ed..13a76329 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -405,7 +405,25 @@ func (s *Plugin) Status() status.Status { } // if there are no workers, threat this as error return status.Status{ - Code: http.StatusInternalServerError, + Code: http.StatusNoContent, + } +} + +// Status return status of the particular plugin +func (s *Plugin) Ready() status.Status { + workers := s.Workers() + for i := 0; i < len(workers); i++ { + // If state of the worker is ready (at least 1) + // we assume, that plugin's worker pool is ready + if workers[i].State().Value() == worker.StateReady { + return status.Status{ + Code: http.StatusOK, + } + } + } + // if there are no workers, threat this as no content error + return status.Status{ + Code: http.StatusNoContent, } } diff --git a/plugins/status/interface.go b/plugins/status/interface.go index 0a92bc52..9d5a13af 100644 --- a/plugins/status/interface.go +++ b/plugins/status/interface.go @@ -9,3 +9,10 @@ type Status struct { type Checker interface { Status() Status } + +// Readiness interface used to get readiness status from the plugin +// that means, that worker poll inside the plugin has 1+ plugins which are ready to work +// at the particular moment +type Readiness interface { + Ready() Status +} diff --git a/plugins/status/plugin.go b/plugins/status/plugin.go index 6fbe67cf..693440bf 100644 --- a/plugins/status/plugin.go +++ b/plugins/status/plugin.go @@ -18,10 +18,13 @@ const ( ) type Plugin struct { - registry map[string]Checker - server *fiber.App - log logger.Logger - cfg *Config + // plugins which needs to be checked just as Status + statusRegistry map[string]Checker + // plugins which needs to send Readiness status + readyRegistry map[string]Readiness + server *fiber.App + log logger.Logger + cfg *Config } func (c *Plugin) Init(log logger.Logger, cfg config.Configurer) error { @@ -34,7 +37,9 @@ func (c *Plugin) Init(log logger.Logger, cfg config.Configurer) error { return errors.E(op, errors.Disabled, err) } - c.registry = make(map[string]Checker) + c.readyRegistry = make(map[string]Readiness) + c.statusRegistry = make(map[string]Checker) + c.log = log return nil } @@ -49,6 +54,7 @@ func (c *Plugin) Serve() chan error { }) c.server.Use("/health", c.healthHandler) + c.server.Use("/ready", c.readinessHandler) go func() { err := c.server.Listen(c.cfg.Address) @@ -69,10 +75,11 @@ func (c *Plugin) Stop() error { return nil } -// Reset named service. -func (c *Plugin) Status(name string) (Status, error) { +// status returns a Checker interface implementation +// Reset named service. This is not an Status interface implementation +func (c *Plugin) status(name string) (Status, error) { const op = errors.Op("checker_plugin_status") - svc, ok := c.registry[name] + svc, ok := c.statusRegistry[name] if !ok { return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) } @@ -80,16 +87,34 @@ func (c *Plugin) Status(name string) (Status, error) { return svc.Status(), nil } -// CollectTarget collecting services which can provide Status. -func (c *Plugin) CollectTarget(name endure.Named, r Checker) error { - c.registry[name.Name()] = r +// ready used to provide a readiness check for the plugin +func (c *Plugin) ready(name string) (Status, error) { + const op = errors.Op("checker_plugin_ready") + svc, ok := c.readyRegistry[name] + if !ok { + return Status{}, errors.E(op, errors.Errorf("no such service: %s", name)) + } + + return svc.Ready(), nil +} + +// CollectCheckerImpls collects services which can provide Status. +func (c *Plugin) CollectCheckerImpls(name endure.Named, r Checker) error { + c.statusRegistry[name.Name()] = r + return nil +} + +// CollectReadinessImpls collects services which can provide Readiness check. +func (c *Plugin) CollectReadinessImpls(name endure.Named, r Readiness) error { + c.readyRegistry[name.Name()] = r return nil } // Collects declares services to be collected. func (c *Plugin) Collects() []interface{} { return []interface{}{ - c.CollectTarget, + c.CollectReadinessImpls, + c.CollectCheckerImpls, } } @@ -109,6 +134,35 @@ type Plugins struct { const template string = "Service: %s: Status: %d\n" +func (c *Plugin) readinessHandler(ctx *fiber.Ctx) error { + const op = errors.Op("checker_plugin_readiness_handler") + plugins := &Plugins{} + err := ctx.QueryParser(plugins) + if err != nil { + return errors.E(op, err) + } + + if len(plugins.Plugins) == 0 { + ctx.Status(http.StatusOK) + _, _ = ctx.WriteString("No plugins provided in query. Query should be in form of: ready?plugin=plugin1&plugin=plugin2 \n") + return nil + } + + // iterate over all provided plugins + for i := 0; i < len(plugins.Plugins); i++ { + // check if the plugin exists + if plugin, ok := c.readyRegistry[plugins.Plugins[i]]; ok { + st := plugin.Ready() + _, _ = ctx.WriteString(fmt.Sprintf(template, plugins.Plugins[i], st.Code)) + } else { + _, _ = ctx.WriteString(fmt.Sprintf("Service: %s not found", plugins.Plugins[i])) + } + } + + ctx.Status(http.StatusOK) + return nil +} + func (c *Plugin) healthHandler(ctx *fiber.Ctx) error { const op = errors.Op("checker_plugin_health_handler") plugins := &Plugins{} @@ -123,26 +177,16 @@ func (c *Plugin) healthHandler(ctx *fiber.Ctx) error { return nil } - failed := false // iterate over all provided plugins for i := 0; i < len(plugins.Plugins); i++ { // check if the plugin exists - if plugin, ok := c.registry[plugins.Plugins[i]]; ok { + if plugin, ok := c.statusRegistry[plugins.Plugins[i]]; ok { st := plugin.Status() - if st.Code >= 500 { - failed = true - continue - } else if st.Code >= 100 && st.Code <= 400 { - _, _ = ctx.WriteString(fmt.Sprintf(template, plugins.Plugins[i], st.Code)) - } + _, _ = ctx.WriteString(fmt.Sprintf(template, plugins.Plugins[i], st.Code)) } else { _, _ = ctx.WriteString(fmt.Sprintf("Service: %s not found", plugins.Plugins[i])) } } - if failed { - ctx.Status(http.StatusInternalServerError) - return nil - } ctx.Status(http.StatusOK) return nil diff --git a/plugins/status/rpc.go b/plugins/status/rpc.go index 396ff451..755a06fa 100644 --- a/plugins/status/rpc.go +++ b/plugins/status/rpc.go @@ -14,7 +14,7 @@ type rpc struct { func (rpc *rpc) Status(service string, status *Status) error { const op = errors.Op("checker_rpc_status") rpc.log.Debug("started Status method", "service", service) - st, err := rpc.srv.Status(service) + st, err := rpc.srv.status(service) if err != nil { return errors.E(op, err) } @@ -22,6 +22,22 @@ func (rpc *rpc) Status(service string, status *Status) error { *status = st rpc.log.Debug("status code", "code", st.Code) - rpc.log.Debug("successfully finished Status method") + rpc.log.Debug("successfully finished the Status method") + return nil +} + +// Status return current status of the provided plugin +func (rpc *rpc) Ready(service string, status *Status) error { + const op = errors.Op("checker_rpc_ready") + rpc.log.Debug("started Ready method", "service", service) + st, err := rpc.srv.ready(service) + if err != nil { + return errors.E(op, err) + } + + *status = st + + rpc.log.Debug("status code", "code", st.Code) + rpc.log.Debug("successfully finished the Ready method") return nil } diff --git a/tests/plugins/status/configs/.rr-ready-init.yaml b/tests/plugins/status/configs/.rr-ready-init.yaml new file mode 100755 index 00000000..da9e84c3 --- /dev/null +++ b/tests/plugins/status/configs/.rr-ready-init.yaml @@ -0,0 +1,28 @@ +rpc: + listen: tcp://127.0.0.1:6007 + +server: + command: "php ../../sleep.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +status: + address: "127.0.0.1:34334" + +logs: + mode: development + level: error +http: + address: 127.0.0.1:11933 + max_request_size: 1024 + middleware: [ "" ] + uploads: + forbid: [ ".php", ".exe", ".bat" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 1 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s diff --git a/tests/plugins/status/plugin_test.go b/tests/plugins/status/plugin_test.go index a7322353..85e07fcf 100644 --- a/tests/plugins/status/plugin_test.go +++ b/tests/plugins/status/plugin_test.go @@ -188,3 +188,201 @@ func checkRPCStatus(t *testing.T) { assert.NoError(t, err) assert.Equal(t, st.Code, 200) } + +func TestReadyHttp(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-status-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + &status.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second) + t.Run("CheckerGetReadiness", checkHTTPReadiness) + + stopCh <- struct{}{} + wg.Wait() +} + +const resp2 = `Service: http: Status: 204 +Service: rpc not found` + +func checkHTTPReadiness(t *testing.T) { + req, err := http.NewRequest("GET", "http://127.0.0.1:34333/ready?plugin=http&plugin=rpc", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, resp, string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +func TestReadinessRPCWorkerNotReady(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.GracefulShutdownTimeout(time.Second*2)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-ready-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &httpPlugin.Plugin{}, + &status.Plugin{}, + ) + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + assert.NoError(t, err) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout, error here is OK, because in the PHP we are sleeping for the 300s + _ = cont.Stop() + return + } + } + }() + + time.Sleep(time.Second) + t.Run("DoHttpReq", doHTTPReq) + time.Sleep(time.Second * 5) + t.Run("CheckerGetReadiness2", checkHTTPReadiness2) + t.Run("CheckerGetRpcReadiness", checkRPCReadiness) + stopCh <- struct{}{} + wg.Wait() +} + +func doHTTPReq(t *testing.T) { + go func() { + req, err := http.NewRequest("GET", "http://localhost:11933", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, resp2, string(b)) + + err = r.Body.Close() + assert.NoError(t, err) + }() +} + +func checkHTTPReadiness2(t *testing.T) { + req, err := http.NewRequest("GET", "http://127.0.0.1:34334/ready?plugin=http&plugin=rpc", nil) + assert.NoError(t, err) + + r, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + b, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, 200, r.StatusCode) + assert.Equal(t, resp2, string(b)) + + err = r.Body.Close() + assert.NoError(t, err) +} + +func checkRPCReadiness(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:6007") + assert.NoError(t, err) + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + st := &status.Status{} + + err = client.Call("status.Ready", "http", &st) + assert.NoError(t, err) + assert.Equal(t, st.Code, 204) +} |