summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-03 13:08:38 +0300
committerValery Piashchynski <[email protected]>2020-12-03 13:08:38 +0300
commit31e4bb7b236fe0ae59fd42d66af0d718b67a2a4b (patch)
treecd51f81b20dc32666165ff85d8b8bdad0e6c464c /plugins
parentbdabf97a6463c0e3897063235f3d556d4f75c4bd (diff)
Initial commit of checker plugin
Diffstat (limited to 'plugins')
-rw-r--r--plugins/checker/config.go5
-rw-r--r--plugins/checker/plugin.go138
-rw-r--r--plugins/checker/rpc.go28
-rwxr-xr-xplugins/checker/tests/configs/.rr-checker-init.yaml29
-rw-r--r--plugins/checker/tests/plugin_test.go82
-rw-r--r--plugins/http/plugin.go17
-rw-r--r--plugins/informer/plugin.go2
7 files changed, 300 insertions, 1 deletions
diff --git a/plugins/checker/config.go b/plugins/checker/config.go
new file mode 100644
index 00000000..5f952592
--- /dev/null
+++ b/plugins/checker/config.go
@@ -0,0 +1,5 @@
+package checker
+
+type Config struct {
+ Address string
+}
diff --git a/plugins/checker/plugin.go b/plugins/checker/plugin.go
new file mode 100644
index 00000000..11dce06e
--- /dev/null
+++ b/plugins/checker/plugin.go
@@ -0,0 +1,138 @@
+package checker
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/gofiber/fiber/v2"
+ "github.com/gofiber/fiber/v2/middleware/logger"
+ "github.com/spiral/endure"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/status"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+)
+
+const (
+ // PluginName declares public plugin name.
+ PluginName = "status"
+)
+
+type Plugin struct {
+ registry map[string]status.Checker
+ server *fiber.App
+ log log.Logger
+ cfg *Config
+}
+
+func (c *Plugin) Init(log log.Logger, cfg config.Configurer) error {
+ const op = errors.Op("status plugin init")
+ err := cfg.UnmarshalKey(PluginName, &c.cfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ c.registry = make(map[string]status.Checker)
+ c.log = log
+ return nil
+}
+
+// localhost:88294/status/all
+func (c *Plugin) Serve() chan error {
+ errCh := make(chan error, 1)
+ c.server = fiber.New()
+ c.server.Group("/v1", c.healthHandler)
+ c.server.Use(logger.New())
+ c.server.Use("/health", c.healthHandler)
+
+ go func() {
+ err := c.server.Listen(c.cfg.Address)
+ if err != nil {
+ errCh <- err
+ }
+ }()
+
+ return errCh
+}
+
+func (c *Plugin) Stop() error {
+ return c.server.Shutdown()
+}
+
+// Reset named service.
+func (c *Plugin) Status(name string) (status.Status, error) {
+ const op = errors.Op("get status")
+ svc, ok := c.registry[name]
+ if !ok {
+ return status.Status{}, errors.E(op, errors.Errorf("no such service: %s", name))
+ }
+
+ return svc.Status(), nil
+}
+
+// CollectTarget collecting services which can provide Status.
+func (c *Plugin) CollectTarget(name endure.Named, r status.Checker) error {
+ c.registry[name.Name()] = r
+ return nil
+}
+
+// Collects declares services to be collected.
+func (c *Plugin) Collects() []interface{} {
+ return []interface{}{
+ c.CollectTarget,
+ }
+}
+
+// Name of the service.
+func (c *Plugin) Name() string {
+ return PluginName
+}
+
+// RPCService returns associated rpc service.
+func (c *Plugin) RPC() interface{} {
+ return &rpc{srv: c, log: c.log}
+}
+
+type Plugins struct {
+ Plugins []string `query:"service"`
+}
+
+const template string = "Service: %s: Status: %d\n"
+
+func (c *Plugin) healthHandler(ctx *fiber.Ctx) error {
+ const op = errors.Op("health_handler")
+ plugins := &Plugins{}
+ err := ctx.QueryParser(plugins)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ if len(plugins.Plugins) == 0 {
+ ctx.Status(http.StatusOK)
+ _, _ = ctx.WriteString("No plugins provided in query. Query should be in form of: /v1/health?plugin=plugin1&plugin=plugin2 \n")
+ return nil
+ }
+
+ failed := false
+ // iterate over all provided plugins
+ for i := 0; i < len(plugins.Plugins); i++ {
+ // check if the plugin exists
+ if plugin, ok := c.registry[plugins.Plugins[i]]; ok {
+ st := plugin.Status()
+ if st.Code >= 500 {
+ failed = true
+ continue
+ } else if st.Code >= 100 && st.Code <= 400 {
+ _, _ = ctx.WriteString(fmt.Sprintf(template, plugins.Plugins[i], st.Code))
+ }
+ } else {
+ _, _ = ctx.WriteString(fmt.Sprintf("Service: %s not found", plugins.Plugins[i]))
+ }
+ }
+ if failed {
+ ctx.Status(http.StatusInternalServerError)
+ return nil
+ }
+
+ ctx.Status(http.StatusOK)
+ return nil
+}
diff --git a/plugins/checker/rpc.go b/plugins/checker/rpc.go
new file mode 100644
index 00000000..d03d1638
--- /dev/null
+++ b/plugins/checker/rpc.go
@@ -0,0 +1,28 @@
+package checker
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/interfaces/log"
+ "github.com/spiral/roadrunner/v2/interfaces/status"
+)
+
+type rpc struct {
+ srv *Plugin
+ log log.Logger
+}
+
+// Status return current status of the provided plugin
+func (rpc *rpc) Status(service string, status *status.Status) error {
+ const op = errors.Op("status")
+ rpc.log.Debug("started Status method", "service", service)
+ st, err := rpc.srv.Status(service)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ *status = st
+
+ rpc.log.Debug("status code", "code", st.Code)
+ rpc.log.Debug("successfully finished Status method")
+ return nil
+}
diff --git a/plugins/checker/tests/configs/.rr-checker-init.yaml b/plugins/checker/tests/configs/.rr-checker-init.yaml
new file mode 100755
index 00000000..ba008853
--- /dev/null
+++ b/plugins/checker/tests/configs/.rr-checker-init.yaml
@@ -0,0 +1,29 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+ disabled: false
+
+server:
+ command: "php ../../../tests/http/client.php echo pipes"
+ user: ""
+ group: ""
+ env:
+ "RR_HTTP": "true"
+ relay: "pipes"
+ relayTimeout: "20s"
+
+status:
+ address: "127.0.0.1:34333"
+
+http:
+ debug: true
+ address: 127.0.0.1:18903
+ maxRequestSize: 1024
+ middleware: [ "" ]
+ uploads:
+ forbid: [ ".php", ".exe", ".bat" ]
+ trustedSubnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ numWorkers: 2
+ maxJobs: 0
+ allocateTimeout: 60s
+ destroyTimeout: 60s \ No newline at end of file
diff --git a/plugins/checker/tests/plugin_test.go b/plugins/checker/tests/plugin_test.go
new file mode 100644
index 00000000..58cb45a0
--- /dev/null
+++ b/plugins/checker/tests/plugin_test.go
@@ -0,0 +1,82 @@
+package tests
+
+import (
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/plugins/checker"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestStatusInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-checker-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &httpPlugin.Plugin{},
+ &checker.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ tt := time.NewTimer(time.Second * 5)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ wg.Wait()
+}
diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go
index 79e8aa94..371cdb91 100644
--- a/plugins/http/plugin.go
+++ b/plugins/http/plugin.go
@@ -18,6 +18,7 @@ import (
"github.com/spiral/roadrunner/v2"
"github.com/spiral/roadrunner/v2/interfaces/log"
factory "github.com/spiral/roadrunner/v2/interfaces/server"
+ "github.com/spiral/roadrunner/v2/interfaces/status"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/util"
@@ -341,6 +342,22 @@ func (s *Plugin) AddMiddleware(name endure.Named, m Middleware) {
s.mdwr[name.Name()] = m
}
+// Status return status of the particular plugin
+func (s *Plugin) Status() status.Status {
+ workers := s.Workers()
+ for i := 0; i < len(workers); i++ {
+ if workers[i].State().IsActive() {
+ return status.Status{
+ Code: http.StatusOK,
+ }
+ }
+ }
+ // if there are no workers, threat this as error
+ return status.Status{
+ Code: http.StatusInternalServerError,
+ }
+}
+
func (s *Plugin) redirect(w http.ResponseWriter, r *http.Request) bool {
if s.https != nil && r.TLS == nil && s.cfg.SSL.Redirect {
target := &url.URL{
diff --git a/plugins/informer/plugin.go b/plugins/informer/plugin.go
index 09d933fd..f3013394 100644
--- a/plugins/informer/plugin.go
+++ b/plugins/informer/plugin.go
@@ -21,7 +21,7 @@ func (p *Plugin) Init(log log.Logger) error {
return nil
}
-// Reset named service.
+// Workers provides WorkerBase slice with workers for the requested plugin
func (p *Plugin) Workers(name string) ([]roadrunner.WorkerBase, error) {
const op = errors.Op("get workers")
svc, ok := p.registry[name]