summaryrefslogtreecommitdiff
path: root/internal/cli/workers
diff options
context:
space:
mode:
Diffstat (limited to 'internal/cli/workers')
-rw-r--r--internal/cli/workers/command.go143
-rw-r--r--internal/cli/workers/command_test.go49
-rw-r--r--internal/cli/workers/render.go135
3 files changed, 327 insertions, 0 deletions
diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go
new file mode 100644
index 00000000..283887e4
--- /dev/null
+++ b/internal/cli/workers/command.go
@@ -0,0 +1,143 @@
+package workers
+
+import (
+ "fmt"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/roadrunner-server/api/v2/plugins/jobs"
+ internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc"
+
+ tm "github.com/buger/goterm"
+ "github.com/fatih/color"
+ "github.com/spf13/cobra"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/spiral/roadrunner-plugins/v2/informer"
+)
+
+// NewCommand creates `workers` command.
+func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen
+ var (
+ // interactive workers updates
+ interactive bool
+ )
+
+ cmd := &cobra.Command{
+ Use: "workers",
+ Short: "Show information about active RoadRunner workers",
+ RunE: func(_ *cobra.Command, args []string) error {
+ const (
+ op = errors.Op("handle_workers_command")
+ informerList = "informer.List"
+ )
+
+ client, err := internalRpc.NewClient(cfgPlugin)
+ if err != nil {
+ return err
+ }
+
+ defer func() { _ = client.Close() }()
+
+ plugins := args // by default we expect plugins list from user
+ if len(plugins) == 0 { // but if nothing was passed - request all informers list
+ if err = client.Call(informerList, true, &plugins); err != nil {
+ return err
+ }
+ }
+
+ if !interactive {
+ return showWorkers(plugins, client)
+ }
+
+ oss := make(chan os.Signal, 1)
+ signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
+
+ tm.Clear()
+
+ tt := time.NewTicker(time.Second)
+ defer tt.Stop()
+
+ for {
+ select {
+ case <-oss:
+ return nil
+
+ case <-tt.C:
+ tm.MoveCursor(1, 1)
+ tm.Flush()
+
+ if err = showWorkers(plugins, client); err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+ },
+ }
+
+ cmd.Flags().BoolVarP(
+ &interactive,
+ "interactive",
+ "i",
+ false,
+ "render interactive workers table",
+ )
+
+ return cmd
+}
+
+func showWorkers(plugins []string, client *rpc.Client) error {
+ const (
+ op = errors.Op("show_workers")
+ informerWorkers = "informer.Workers"
+ informerJobs = "informer.Jobs"
+ // this is only one exception to Render the workers, service plugin has the same workers as other plugins,
+ // but they are RAW processes and needs to be handled in a different way. We don't need a special RPC call, but
+ // need a special render method.
+ servicePluginName = "service"
+ )
+
+ for _, plugin := range plugins {
+ list := &informer.WorkerList{}
+
+ if err := client.Call(informerWorkers, plugin, &list); err != nil {
+ return errors.E(op, err)
+ }
+
+ if len(list.Workers) == 0 {
+ continue
+ }
+
+ if plugin == servicePluginName {
+ fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
+ ServiceWorkerTable(os.Stdout, list.Workers).Render()
+
+ continue
+ }
+
+ fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
+
+ WorkerTable(os.Stdout, list.Workers).Render()
+ }
+
+ for _, plugin := range plugins {
+ var jst []*jobs.State
+
+ if err := client.Call(informerJobs, plugin, &jst); err != nil {
+ return errors.E(op, err)
+ }
+
+ // eq to nil
+ if len(jst) == 0 {
+ continue
+ }
+
+ fmt.Printf("Jobs of [%s]:\n", color.HiYellowString(plugin))
+ JobsTable(os.Stdout, jst).Render()
+ }
+
+ return nil
+}
diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go
new file mode 100644
index 00000000..e593686d
--- /dev/null
+++ b/internal/cli/workers/command_test.go
@@ -0,0 +1,49 @@
+package workers_test
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/workers"
+
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCommandProperties(t *testing.T) {
+ cmd := workers.NewCommand(&config.Plugin{})
+
+ assert.Equal(t, "workers", cmd.Use)
+ assert.NotNil(t, cmd.RunE)
+}
+
+func TestCommandFlags(t *testing.T) {
+ cmd := workers.NewCommand(&config.Plugin{})
+
+ cases := []struct {
+ giveName string
+ wantShorthand string
+ wantDefault string
+ }{
+ {giveName: "interactive", wantShorthand: "i", wantDefault: "false"},
+ }
+
+ for _, tt := range cases {
+ tt := tt
+ t.Run(tt.giveName, func(t *testing.T) {
+ flag := cmd.Flag(tt.giveName)
+
+ if flag == nil {
+ assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName)
+
+ return
+ }
+
+ assert.Equal(t, tt.wantShorthand, flag.Shorthand)
+ assert.Equal(t, tt.wantDefault, flag.DefValue)
+ })
+ }
+}
+
+func TestExecution(t *testing.T) {
+ t.Skip("Command execution is not implemented yet")
+}
diff --git a/internal/cli/workers/render.go b/internal/cli/workers/render.go
new file mode 100644
index 00000000..0bdf09b6
--- /dev/null
+++ b/internal/cli/workers/render.go
@@ -0,0 +1,135 @@
+package workers
+
+import (
+ "io"
+ "strconv"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/fatih/color"
+ "github.com/olekukonko/tablewriter"
+ "github.com/roadrunner-server/api/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/state/process"
+)
+
+const (
+ Ready string = "READY"
+ Paused string = "PAUSED/STOPPED"
+)
+
+// WorkerTable renders table with information about rr server workers.
+func WorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 9)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 7)
+ tw.SetColMinWidth(4, 7)
+ tw.SetColMinWidth(5, 18)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ renderStatus(workers[i].Status),
+ renderJobs(workers[i].NumJobs),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ renderAlive(time.Unix(0, workers[i].Created)),
+ })
+ }
+
+ return tw
+}
+
+// ServiceWorkerTable renders table with information about rr server workers.
+func ServiceWorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetAutoWrapText(false)
+ tw.SetHeader([]string{"PID", "Memory", "CPU%", "Command"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 7)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 18)
+ tw.SetAlignment(tablewriter.ALIGN_LEFT)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ workers[i].Command,
+ })
+ }
+
+ return tw
+}
+
+// JobsTable renders table with information about rr server jobs.
+func JobsTable(writer io.Writer, jobs []*jobs.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetAutoWrapText(false)
+ tw.SetHeader([]string{"Status", "Pipeline", "Driver", "Queue", "Active", "Delayed", "Reserved"})
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetColWidth(7)
+ tw.SetColWidth(15)
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetAlignment(tablewriter.ALIGN_LEFT)
+
+ for i := 0; i < len(jobs); i++ {
+ tw.Append([]string{
+ renderReady(jobs[i].Ready),
+ jobs[i].Pipeline,
+ jobs[i].Driver,
+ jobs[i].Queue,
+ strconv.Itoa(int(jobs[i].Active)),
+ strconv.Itoa(int(jobs[i].Delayed)),
+ strconv.Itoa(int(jobs[i].Reserved)),
+ })
+ }
+
+ return tw
+}
+
+func renderReady(ready bool) string {
+ if ready {
+ return Ready
+ }
+
+ return Paused
+}
+
+//go:inline
+func renderCPU(cpu float64) string {
+ return strconv.FormatFloat(cpu, 'f', 2, 64)
+}
+
+func renderStatus(status string) string {
+ switch status {
+ case "inactive":
+ return color.YellowString("inactive")
+ case "ready":
+ return color.CyanString("ready")
+ case "working":
+ return color.GreenString("working")
+ case "invalid":
+ return color.YellowString("invalid")
+ case "stopped":
+ return color.RedString("stopped")
+ case "errored":
+ return color.RedString("errored")
+ default:
+ return status
+ }
+}
+
+func renderJobs(number uint64) string {
+ return humanize.Comma(int64(number))
+}
+
+func renderAlive(t time.Time) string {
+ return humanize.RelTime(t, time.Now(), "ago", "")
+}