summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-09-29 23:37:16 +0300
committerGitHub <[email protected]>2018-09-29 23:37:16 +0300
commit6122fca108c20984732c969fb1ba53cce5b3c44a (patch)
tree40835f46a5c208ea2546b76e3bd9fa05429b405a
parentabe62c0675f839586312cff1c83d6a4cb31dd9d5 (diff)
parenta04b5b33eb30944007973067ec07e9c4a2c464ab (diff)
Merge pull request #39 from spiral/feature/1.3.0v1.2.3
Feature/1.3.0
-rw-r--r--.travis.yml3
-rw-r--r--CHANGELOG.md6
-rwxr-xr-xbuild.sh2
-rw-r--r--cmd/rr/cmd/root.go8
-rw-r--r--cmd/rr/debug/debugger.go120
-rw-r--r--cmd/rr/http/debug.go84
-rw-r--r--cmd/rr/http/reset.go18
-rw-r--r--cmd/rr/http/workers.go71
-rw-r--r--cmd/rr/main.go2
-rw-r--r--cmd/util/cprint.go (renamed from cmd/rr/utils/cprint.go)2
-rw-r--r--cmd/util/debug.go57
-rw-r--r--cmd/util/rpc.go18
-rw-r--r--cmd/util/table.go58
-rw-r--r--service/container.go9
-rw-r--r--service/container_test.go35
-rw-r--r--service/http/handler.go5
-rw-r--r--service/http/request.go6
-rw-r--r--service/http/rpc.go33
-rw-r--r--service/http/uploads.go3
-rw-r--r--util/state.go62
20 files changed, 324 insertions, 278 deletions
diff --git a/.travis.yml b/.travis.yml
index d2edddbf..78d66eb5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -3,7 +3,7 @@ language: go
sudo: required
go:
- - "1.10.x"
+ - "1.11.x"
before_install:
- go version
@@ -20,6 +20,7 @@ install:
- go get -u "github.com/sirupsen/logrus"
- go get -u "github.com/pkg/errors"
- go get -u "github.com/stretchr/testify/assert"
+ - go get -u "github.com/shirou/gopsutil/process"
- composer install --no-interaction --prefer-source --ignore-platform-reqs
script:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c5e6b0c6..4d147c5d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,12 @@
CHANGELOG
=========
+v1.2.3 (29.09.2018)
+------
+- reduced verbosity
+- worker list has been extracted from http service and now available for other rr based services
+- built using Go 1.11
+
v1.2.2 (23.09.2018)
------
- new project directory structure
diff --git a/build.sh b/build.sh
index 1c2d8128..32964a57 100755
--- a/build.sh
+++ b/build.sh
@@ -2,7 +2,7 @@
cd $(dirname "${BASH_SOURCE[0]}")
OD="$(pwd)"
# Pushes application version into the build information.
-RR_VERSION=1.2.2
+RR_VERSION=1.2.3
# Hardcode some values to the core package
LDFLAGS="$LDFLAGS -X github.com/spiral/roadrunner/cmd/rr/cmd.Version=${RR_VERSION}"
diff --git a/cmd/rr/cmd/root.go b/cmd/rr/cmd/root.go
index 4ab37967..e71ce0c4 100644
--- a/cmd/rr/cmd/root.go
+++ b/cmd/rr/cmd/root.go
@@ -24,7 +24,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
- "github.com/spiral/roadrunner/cmd/rr/utils"
+ "github.com/spiral/roadrunner/cmd/util"
"github.com/spiral/roadrunner/service"
"os"
"path/filepath"
@@ -51,7 +51,7 @@ var (
Use: "rr",
SilenceErrors: true,
SilenceUsage: true,
- Short: utils.Sprintf(
+ Short: util.Sprintf(
"<green>RoadRunner, PHP Application Server:</reset>\nVersion: <yellow+hb>%s</reset>, %s",
Version,
BuildTime,
@@ -83,7 +83,7 @@ func (w *ViperWrapper) Unmarshal(out interface{}) error {
// This is called by main.main(). It only needs to happen once to the CLI.
func Execute() {
if err := CLI.Execute(); err != nil {
- utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
+ util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
os.Exit(1)
}
}
@@ -100,7 +100,7 @@ func init() {
if cfg := initConfig(cfgFile, []string{"."}, ".rr"); cfg != nil {
if err := Container.Init(cfg); err != nil {
- utils.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
+ util.Printf("<red+hb>Error:</reset> <red>%s</reset>\n", err)
os.Exit(1)
}
}
diff --git a/cmd/rr/debug/debugger.go b/cmd/rr/debug/debugger.go
deleted file mode 100644
index ed9a1a56..00000000
--- a/cmd/rr/debug/debugger.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package debug
-
-import (
- "fmt"
- "github.com/sirupsen/logrus"
- "github.com/spiral/roadrunner"
- "github.com/spiral/roadrunner/cmd/rr/utils"
- rrhttp "github.com/spiral/roadrunner/service/http"
- "net/http"
- "strings"
-)
-
-// Listener creates new debug listener.
-func Listener(logger *logrus.Logger) func(event int, ctx interface{}) {
- return (&debugger{logger}).listener
-}
-
-// listener provide debug callback for system events. With colors!
-type debugger struct{ logger *logrus.Logger }
-
-// listener listens to http events and generates nice looking output.
-func (s *debugger) listener(event int, ctx interface{}) {
- // http events
- switch event {
- case rrhttp.EventResponse:
- e := ctx.(*rrhttp.ResponseEvent)
- s.logger.Info(utils.Sprintf(
- "<cyan+h>%s</reset> %s <white+hb>%s</reset> %s",
- e.Request.RemoteAddr,
- statusColor(e.Response.Status),
- e.Request.Method,
- e.Request.URI,
- ))
- case rrhttp.EventError:
- e := ctx.(*rrhttp.ErrorEvent)
-
- if _, ok := e.Error.(roadrunner.JobError); ok {
- s.logger.Info(utils.Sprintf(
- "%s <white+hb>%s</reset> %s",
- statusColor(500),
- e.Request.Method,
- uri(e.Request),
- ))
- } else {
- s.logger.Info(utils.Sprintf(
- "%s <white+hb>%s</reset> %s <red>%s</reset>",
- statusColor(500),
- e.Request.Method,
- uri(e.Request),
- e.Error,
- ))
- }
- }
-
- switch event {
- case roadrunner.EventWorkerKill:
- w := ctx.(*roadrunner.Worker)
- s.logger.Warning(utils.Sprintf(
- "<white+hb>worker.%v</reset> <yellow>killed</red>",
- *w.Pid,
- ))
- case roadrunner.EventWorkerError:
- err := ctx.(roadrunner.WorkerError)
- s.logger.Error(utils.Sprintf(
- "<white+hb>worker.%v</reset> <red>%s</reset>",
- *err.Worker.Pid,
- err.Caused,
- ))
- }
-
- // outputs
- switch event {
- case roadrunner.EventStderrOutput:
- s.logger.Warning(utils.Sprintf(
- "<yellow>%s</reset>",
- strings.Trim(string(ctx.([]byte)), "\r\n"),
- ))
- }
-
- // rr server events
- switch event {
- case roadrunner.EventServerFailure:
- s.logger.Error(utils.Sprintf("<red>server is dead</reset>"))
- }
-
- // pool events
- switch event {
- case roadrunner.EventPoolConstruct:
- s.logger.Debug(utils.Sprintf("<cyan>new worker pool</reset>"))
- case roadrunner.EventPoolError:
- s.logger.Error(utils.Sprintf("<red>%s</reset>", ctx))
- }
-
- //s.logger.Warning(event, ctx)
-}
-
-func statusColor(status int) string {
- if status < 300 {
- return utils.Sprintf("<green>%v</reset>", status)
- }
-
- if status < 400 {
- return utils.Sprintf("<cyan>%v</reset>", status)
- }
-
- if status < 500 {
- return utils.Sprintf("<yellow>%v</reset>", status)
- }
-
- return utils.Sprintf("<red>%v</reset>", status)
-}
-
-// uri fetches full uri from request in a form of string (including https scheme if TLS connection is enabled).
-func uri(r *http.Request) string {
- if r.TLS != nil {
- return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
- }
-
- return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
-}
diff --git a/cmd/rr/http/debug.go b/cmd/rr/http/debug.go
index f69e10a8..53980303 100644
--- a/cmd/rr/http/debug.go
+++ b/cmd/rr/http/debug.go
@@ -1,20 +1,90 @@
package http
import (
- rr "github.com/spiral/roadrunner/cmd/rr/cmd"
-
+ "fmt"
+ "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
- "github.com/spiral/roadrunner/cmd/rr/debug"
- "github.com/spiral/roadrunner/service/http"
+ "github.com/spiral/roadrunner"
+ rr "github.com/spiral/roadrunner/cmd/rr/cmd"
+ "github.com/spiral/roadrunner/cmd/util"
+ rrhttp "github.com/spiral/roadrunner/service/http"
+ "net/http"
)
func init() {
cobra.OnInitialize(func() {
if rr.Debug {
- svc, _ := rr.Container.Get(http.ID)
- if svc, ok := svc.(*http.Service); ok {
- svc.AddListener(debug.Listener(rr.Logger))
+ svc, _ := rr.Container.Get(rrhttp.ID)
+ if svc, ok := svc.(*rrhttp.Service); ok {
+ svc.AddListener((&debugger{logger: rr.Logger}).listener)
}
}
})
}
+
+// listener provide debug callback for system events. With colors!
+type debugger struct{ logger *logrus.Logger }
+
+// listener listens to http events and generates nice looking output.
+func (s *debugger) listener(event int, ctx interface{}) {
+ if util.LogEvent(s.logger, event, ctx) {
+ // handler by default debug package
+ return
+ }
+
+ // http events
+ switch event {
+ case rrhttp.EventResponse:
+ e := ctx.(*rrhttp.ResponseEvent)
+ s.logger.Info(util.Sprintf(
+ "<cyan+h>%s</reset> %s <white+hb>%s</reset> %s",
+ e.Request.RemoteAddr,
+ statusColor(e.Response.Status),
+ e.Request.Method,
+ e.Request.URI,
+ ))
+ case rrhttp.EventError:
+ e := ctx.(*rrhttp.ErrorEvent)
+
+ if _, ok := e.Error.(roadrunner.JobError); ok {
+ s.logger.Info(util.Sprintf(
+ "%s <white+hb>%s</reset> %s",
+ statusColor(500),
+ e.Request.Method,
+ uri(e.Request),
+ ))
+ } else {
+ s.logger.Info(util.Sprintf(
+ "%s <white+hb>%s</reset> %s <red>%s</reset>",
+ statusColor(500),
+ e.Request.Method,
+ uri(e.Request),
+ e.Error,
+ ))
+ }
+ }
+}
+
+func statusColor(status int) string {
+ if status < 300 {
+ return util.Sprintf("<green>%v</reset>", status)
+ }
+
+ if status < 400 {
+ return util.Sprintf("<cyan>%v</reset>", status)
+ }
+
+ if status < 500 {
+ return util.Sprintf("<yellow>%v</reset>", status)
+ }
+
+ return util.Sprintf("<red>%v</reset>", status)
+}
+
+func uri(r *http.Request) string {
+ if r.TLS != nil {
+ return fmt.Sprintf("https://%s%s", r.Host, r.URL.String())
+ }
+
+ return fmt.Sprintf("http://%s%s", r.Host, r.URL.String())
+}
diff --git a/cmd/rr/http/reset.go b/cmd/rr/http/reset.go
index 3bc089ec..42fd966d 100644
--- a/cmd/rr/http/reset.go
+++ b/cmd/rr/http/reset.go
@@ -21,41 +21,33 @@
package http
import (
- "errors"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/cmd/rr/utils"
- "github.com/spiral/roadrunner/service"
- "github.com/spiral/roadrunner/service/rpc"
+ "github.com/spiral/roadrunner/cmd/util"
)
func init() {
rr.CLI.AddCommand(&cobra.Command{
Use: "http:reset",
- Short: "Reload RoadRunner worker pools for the HTTP service",
+ Short: "Reload RoadRunner worker pool for the HTTP service",
RunE: reloadHandler,
})
}
func reloadHandler(cmd *cobra.Command, args []string) error {
- svc, st := rr.Container.Get(rpc.ID)
- if st < service.StatusOK {
- return errors.New("RPC service is not configured")
- }
-
- client, err := svc.(*rpc.Service).Client()
+ client, err := util.RPCClient(rr.Container)
if err != nil {
return err
}
defer client.Close()
- utils.Printf("<green>restarting http worker pool</reset>: ")
+ util.Printf("<green>restarting http worker pool</reset>: ")
var r string
if err := client.Call("http.Reset", true, &r); err != nil {
return err
}
- utils.Printf("<green+hb>done</reset>\n")
+ util.Printf("<green+hb>done</reset>\n")
return nil
}
diff --git a/cmd/rr/http/workers.go b/cmd/rr/http/workers.go
index b03c273f..4444b87f 100644
--- a/cmd/rr/http/workers.go
+++ b/cmd/rr/http/workers.go
@@ -21,21 +21,14 @@
package http
import (
- "errors"
tm "github.com/buger/goterm"
- "github.com/dustin/go-humanize"
- "github.com/olekukonko/tablewriter"
- "github.com/shirou/gopsutil/process"
"github.com/spf13/cobra"
rr "github.com/spiral/roadrunner/cmd/rr/cmd"
- "github.com/spiral/roadrunner/cmd/rr/utils"
- "github.com/spiral/roadrunner/service"
+ "github.com/spiral/roadrunner/cmd/util"
"github.com/spiral/roadrunner/service/http"
- rrpc "github.com/spiral/roadrunner/service/rpc"
"net/rpc"
"os"
"os/signal"
- "strconv"
"syscall"
"time"
)
@@ -73,12 +66,7 @@ func workersHandler(cmd *cobra.Command, args []string) (err error) {
}
}()
- svc, st := rr.Container.Get(rrpc.ID)
- if st < service.StatusOK {
- return errors.New("RPC service is not configured")
- }
-
- client, err := svc.(*rrpc.Service).Client()
+ client, err := util.RPCClient(rr.Container)
if err != nil {
return err
}
@@ -108,58 +96,5 @@ func showWorkers(client *rpc.Client) {
panic(err)
}
- tw := tablewriter.NewWriter(os.Stdout)
- tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"})
- tw.SetColMinWidth(0, 7)
- tw.SetColMinWidth(1, 9)
- tw.SetColMinWidth(2, 7)
- tw.SetColMinWidth(3, 7)
- tw.SetColMinWidth(4, 18)
-
- for _, w := range r.Workers {
- tw.Append([]string{
- strconv.Itoa(w.Pid),
- renderStatus(w.Status),
- renderJobs(w.NumJobs),
- renderMemory(w.Pid),
- renderAlive(time.Unix(0, w.Created)),
- })
- }
-
- tw.Render()
-}
-
-func renderStatus(status string) string {
- switch status {
- case "inactive":
- return utils.Sprintf("<yellow>inactive</reset>")
- case "ready":
- return utils.Sprintf("<cyan>ready</reset>")
- case "working":
- return utils.Sprintf("<green>working</reset>")
- case "stopped":
- return utils.Sprintf("<red>stopped</reset>")
- case "errored":
- return utils.Sprintf("<red>errored</reset>")
- }
-
- return status
-}
-
-func renderJobs(number int64) string {
- return humanize.Comma(int64(number))
-}
-
-func renderAlive(t time.Time) string {
- return humanize.RelTime(t, time.Now(), "ago", "")
-}
-
-func renderMemory(pid int) string {
- p, _ := process.NewProcess(int32(pid))
- i, err := p.MemoryInfo()
- if err != nil {
- return err.Error()
- }
-
- return humanize.Bytes(i.RSS)
+ util.WorkerTable(r.Workers).Render()
}
diff --git a/cmd/rr/main.go b/cmd/rr/main.go
index 18e22cdd..4d2a06b4 100644
--- a/cmd/rr/main.go
+++ b/cmd/rr/main.go
@@ -32,7 +32,7 @@ import (
"github.com/spiral/roadrunner/service/rpc"
"github.com/spiral/roadrunner/service/static"
- // additional command and debug handlers
+ // additional commands and debug handlers
_ "github.com/spiral/roadrunner/cmd/rr/http"
)
diff --git a/cmd/rr/utils/cprint.go b/cmd/util/cprint.go
index 020975ec..0985de62 100644
--- a/cmd/rr/utils/cprint.go
+++ b/cmd/util/cprint.go
@@ -1,4 +1,4 @@
-package utils
+package util
import (
"fmt"
diff --git a/cmd/util/debug.go b/cmd/util/debug.go
new file mode 100644
index 00000000..7be258ec
--- /dev/null
+++ b/cmd/util/debug.go
@@ -0,0 +1,57 @@
+package util
+
+import (
+ "github.com/sirupsen/logrus"
+ "github.com/spiral/roadrunner"
+ "strings"
+)
+
+// LogEvent outputs rr event into given logger and return false if event was not handled.
+func LogEvent(logger *logrus.Logger, event int, ctx interface{}) bool {
+ switch event {
+ case roadrunner.EventWorkerKill:
+ w := ctx.(*roadrunner.Worker)
+ logger.Warning(Sprintf(
+ "<white+hb>worker.%v</reset> <yellow>killed</red>",
+ *w.Pid,
+ ))
+ return true
+ case roadrunner.EventWorkerError:
+ err := ctx.(roadrunner.WorkerError)
+ logger.Error(Sprintf(
+ "<white+hb>worker.%v</reset> <red>%s</reset>",
+ *err.Worker.Pid,
+ err.Caused,
+ ))
+ return true
+ }
+
+ // outputs
+ switch event {
+ case roadrunner.EventStderrOutput:
+ logger.Warning(Sprintf(
+ "<yellow>%s</reset>",
+ strings.Trim(string(ctx.([]byte)), "\r\n"),
+ ))
+ return true
+ }
+
+ // rr server events
+ switch event {
+ case roadrunner.EventServerFailure:
+ logger.Error(Sprintf("<red>server is dead</reset>"))
+ return true
+ }
+
+ // pool events
+ switch event {
+ case roadrunner.EventPoolConstruct:
+ logger.Debug(Sprintf("<cyan>new worker pool</reset>"))
+ return true
+ case roadrunner.EventPoolError:
+ logger.Error(Sprintf("<red>%s</reset>", ctx))
+ return true
+ }
+
+ return false
+}
diff --git a/cmd/util/rpc.go b/cmd/util/rpc.go
new file mode 100644
index 00000000..ee3414a6
--- /dev/null
+++ b/cmd/util/rpc.go
@@ -0,0 +1,18 @@
+package util
+
+import (
+ "errors"
+ "github.com/spiral/roadrunner/service"
+ rrpc "github.com/spiral/roadrunner/service/rpc"
+ "net/rpc"
+)
+
+// RPCClient returns RPC client associated with given roadrunner service container.
+func RPCClient(container service.Container) (*rpc.Client, error) {
+ svc, st := container.Get(rrpc.ID)
+ if st < service.StatusOK {
+ return nil, errors.New("RPC service is not configured")
+ }
+
+ return svc.(*rrpc.Service).Client()
+}
diff --git a/cmd/util/table.go b/cmd/util/table.go
new file mode 100644
index 00000000..565c0679
--- /dev/null
+++ b/cmd/util/table.go
@@ -0,0 +1,58 @@
+package util
+
+import (
+ "github.com/dustin/go-humanize"
+ "github.com/olekukonko/tablewriter"
+ rrutil "github.com/spiral/roadrunner/util"
+ "os"
+ "strconv"
+ "time"
+)
+
+// WorkerTable renders table with information about rr server workers.
+func WorkerTable(workers []*rrutil.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(os.Stdout)
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "Created"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 9)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 7)
+ tw.SetColMinWidth(4, 18)
+
+ for _, w := range workers {
+ tw.Append([]string{
+ strconv.Itoa(w.Pid),
+ renderStatus(w.Status),
+ renderJobs(w.NumJobs),
+ humanize.Bytes(w.MemoryUsage),
+ renderAlive(time.Unix(0, w.Created)),
+ })
+ }
+
+ return tw
+}
+
+func renderStatus(status string) string {
+ switch status {
+ case "inactive":
+ return Sprintf("<yellow>inactive</reset>")
+ case "ready":
+ return Sprintf("<cyan>ready</reset>")
+ case "working":
+ return Sprintf("<green>working</reset>")
+ case "stopped":
+ return Sprintf("<red>stopped</reset>")
+ case "errored":
+ return Sprintf("<red>errored</reset>")
+ }
+
+ return status
+}
+
+func renderJobs(number int64) string {
+ return humanize.Comma(int64(number))
+}
+
+func renderAlive(t time.Time) string {
+ return humanize.RelTime(t, time.Now(), "ago", "")
+}
diff --git a/service/container.go b/service/container.go
index fc1012c8..0ddb4251 100644
--- a/service/container.go
+++ b/service/container.go
@@ -95,8 +95,6 @@ func (c *container) Register(name string, service interface{}) {
svc: service,
status: StatusRegistered,
})
-
- c.log.Debugf("[%s]: registered", name)
}
// Check hasStatus svc has been registered.
@@ -138,14 +136,13 @@ func (c *container) Init(cfg Config) error {
if ok, err := c.initService(e.svc, cfg.Get(e.name)); err != nil {
// soft error (skipping)
if err == errNoConfig {
- c.log.Debugf("[%s]: no config has been provided", e.name)
+ c.log.Debugf("[%s]: disabled", e.name)
continue
}
return errors.Wrap(err, fmt.Sprintf("[%s]", e.name))
} else if ok {
e.setStatus(StatusOK)
- c.log.Debugf("[%s]: initiated", e.name)
} else {
c.log.Debugf("[%s]: disabled", e.name)
}
@@ -168,7 +165,7 @@ func (c *container) Serve() error {
continue
}
- c.log.Debugf("[%s]: service started", e.name)
+ c.log.Debugf("[%s]: started", e.name)
go func(e *entry) {
e.setStatus(StatusServing)
defer e.setStatus(StatusStopped)
@@ -202,8 +199,6 @@ func (c *container) Serve() error {
// Stop sends stop command to all running services.
func (c *container) Stop() {
- c.log.Debugf("received stop command")
-
for _, e := range c.services {
if e.hasStatus(StatusServing) {
e.svc.(Service).Stop()
diff --git a/service/container_test.go b/service/container_test.go
index 8eeb647a..7d83ee9a 100644
--- a/service/container_test.go
+++ b/service/container_test.go
@@ -117,7 +117,7 @@ func TestContainer_Register(t *testing.T) {
c := NewContainer(logger)
c.Register("test", &testService{})
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
}
func TestContainer_Has(t *testing.T) {
@@ -127,7 +127,7 @@ func TestContainer_Has(t *testing.T) {
c := NewContainer(logger)
c.Register("test", &testService{})
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.True(t, c.Has("test"))
assert.False(t, c.Has("another"))
@@ -139,8 +139,7 @@ func TestContainer_Get(t *testing.T) {
c := NewContainer(logger)
c.Register("test", &testService{})
-
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
s, st := c.Get("test")
assert.IsType(t, &testService{}, s)
@@ -157,7 +156,7 @@ func TestContainer_Stop_NotStarted(t *testing.T) {
c := NewContainer(logger)
c.Register("test", &testService{})
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
c.Stop()
}
@@ -170,7 +169,7 @@ func TestContainer_Configure(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
@@ -187,7 +186,7 @@ func TestContainer_Init_Default(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{}`}))
@@ -206,7 +205,7 @@ func TestContainer_Init_Default_Overwrite(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":{"value": "something"}}`}))
@@ -225,10 +224,10 @@ func TestContainer_ConfigureNull(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"another":"something"}`}))
- assert.Equal(t, 2, len(hook.Entries))
+ assert.Equal(t, 1, len(hook.Entries))
s, st := c.Get("test")
assert.IsType(t, &testService{}, s)
@@ -243,10 +242,10 @@ func TestContainer_ConfigureDisabled(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
- assert.Equal(t, 2, len(hook.Entries))
+ assert.Equal(t, 1, len(hook.Entries))
s, st := c.Get("test")
assert.IsType(t, &testService{}, s)
@@ -264,7 +263,7 @@ func TestContainer_ConfigureError(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
err := c.Init(&testCfg{`{"test":"something"}`})
assert.Error(t, err)
@@ -284,7 +283,7 @@ func TestContainer_ConfigureTwice(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
assert.Error(t, c.Init(&testCfg{`{"test":"something"}`}))
@@ -298,7 +297,7 @@ func TestContainer_ServeEmptyContainer(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Serve())
c.Stop()
@@ -315,7 +314,7 @@ func TestContainer_Serve(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
go func() {
@@ -347,7 +346,7 @@ func TestContainer_ServeError(t *testing.T) {
c := NewContainer(logger)
c.Register("test", svc)
- assert.Equal(t, 1, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something"}`}))
err := c.Serve()
@@ -379,7 +378,7 @@ func TestContainer_ServeErrorMultiple(t *testing.T) {
c := NewContainer(logger)
c.Register("test2", svc2)
c.Register("test", svc)
- assert.Equal(t, 2, len(hook.Entries))
+ assert.Equal(t, 0, len(hook.Entries))
assert.NoError(t, c.Init(&testCfg{`{"test":"something", "test2":"something-else"}`}))
err := c.Serve()
diff --git a/service/http/handler.go b/service/http/handler.go
index 945cd51e..f719c751 100644
--- a/service/http/handler.go
+++ b/service/http/handler.go
@@ -72,10 +72,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
- if err = req.Open(); err != nil {
- h.handleError(w, r, err)
- return
- }
+ req.Open()
defer req.Close()
p, err := req.Payload()
diff --git a/service/http/request.go b/service/http/request.go
index d733b20c..eb5c05bd 100644
--- a/service/http/request.go
+++ b/service/http/request.go
@@ -109,12 +109,12 @@ func NewRequest(r *http.Request, cfg *UploadsConfig) (req *Request, err error) {
}
// Open moves all uploaded files to temporary directory so it can be given to php later.
-func (r *Request) Open() error {
+func (r *Request) Open() {
if r.Uploads == nil {
- return nil
+ return
}
- return r.Uploads.Open()
+ r.Uploads.Open()
}
// Close clears all temp file uploads
diff --git a/service/http/rpc.go b/service/http/rpc.go
index 9dfe718e..08b3f262 100644
--- a/service/http/rpc.go
+++ b/service/http/rpc.go
@@ -2,6 +2,7 @@ package http
import (
"github.com/pkg/errors"
+ "github.com/spiral/roadrunner/util"
)
type rpcServer struct{ svc *Service }
@@ -9,22 +10,7 @@ type rpcServer struct{ svc *Service }
// WorkerList contains list of workers.
type WorkerList struct {
// Workers is list of workers.
- Workers []Worker `json:"workers"`
-}
-
-// Worker provides information about specific worker.
-type Worker struct {
- // Pid contains process id.
- Pid int `json:"pid"`
-
- // Status of the worker.
- Status string `json:"status"`
-
- // Number of worker executions.
- NumJobs int64 `json:"numExecs"`
-
- // Created is unix nano timestamp of worker creation time.
- Created int64 `json:"created"`
+ Workers []*util.State `json:"workers"`
}
// Reset resets underlying RR worker pool and restarts all of it's workers.
@@ -38,20 +24,11 @@ func (rpc *rpcServer) Reset(reset bool, r *string) error {
}
// Workers returns list of active workers and their stats.
-func (rpc *rpcServer) Workers(list bool, r *WorkerList) error {
+func (rpc *rpcServer) Workers(list bool, r *WorkerList) (err error) {
if rpc.svc == nil || rpc.svc.srv == nil {
return errors.New("http server is not running")
}
- for _, w := range rpc.svc.rr.Workers() {
- state := w.State()
- r.Workers = append(r.Workers, Worker{
- Pid: *w.Pid,
- Status: state.String(),
- NumJobs: state.NumExecs(),
- Created: w.Created.UnixNano(),
- })
- }
-
- return nil
+ r.Workers, err = util.ServerState(rpc.svc.rr)
+ return err
}
diff --git a/service/http/uploads.go b/service/http/uploads.go
index 9b205f00..7610ab28 100644
--- a/service/http/uploads.go
+++ b/service/http/uploads.go
@@ -45,7 +45,7 @@ func (u *Uploads) MarshalJSON() ([]byte, error) {
// Open moves all uploaded files to temp directory, return error in case of issue with temp directory. File errors
// will be handled individually.
-func (u *Uploads) Open() error {
+func (u *Uploads) Open() {
var wg sync.WaitGroup
for _, f := range u.list {
wg.Add(1)
@@ -56,7 +56,6 @@ func (u *Uploads) Open() error {
}
wg.Wait()
- return nil
}
// Clear deletes all temporary files.
diff --git a/util/state.go b/util/state.go
new file mode 100644
index 00000000..29fca945
--- /dev/null
+++ b/util/state.go
@@ -0,0 +1,62 @@
+package util
+
+import (
+ "errors"
+ "github.com/shirou/gopsutil/process"
+ "github.com/spiral/roadrunner"
+)
+
+// State provides information about specific worker.
+type State struct {
+ // Pid contains process id.
+ Pid int `json:"pid"`
+
+ // Status of the worker.
+ Status string `json:"status"`
+
+ // Number of worker executions.
+ NumJobs int64 `json:"numExecs"`
+
+ // Created is unix nano timestamp of worker creation time.
+ Created int64 `json:"created"`
+
+ // MemoryUsage holds the information about worker memory usage in bytes.
+ // Values might vary for different operating systems and based on RSS.
+ MemoryUsage uint64 `json:"memoryUsage"`
+}
+
+// WorkerState creates new worker state definition.
+func WorkerState(w *roadrunner.Worker) (*State, error) {
+ p, _ := process.NewProcess(int32(*w.Pid))
+ i, err := p.MemoryInfo()
+ if err != nil {
+ return nil, err
+ }
+
+ return &State{
+ Pid: *w.Pid,
+ Status: w.State().String(),
+ NumJobs: w.State().NumExecs(),
+ Created: w.Created.UnixNano(),
+ MemoryUsage: i.RSS,
+ }, nil
+}
+
+// ServerState returns list of all worker states of a given rr server.
+func ServerState(rr *roadrunner.Server) ([]*State, error) {
+ if rr == nil {
+ return nil, errors.New("rr server is not running")
+ }
+
+ result := make([]*State, 0)
+ for _, w := range rr.Workers() {
+ state, err := WorkerState(w)
+ if err != nil {
+ return nil, err
+ }
+
+ result = append(result, state)
+ }
+
+ return result, nil
+}