diff options
Diffstat (limited to 'internal/cli/workers')
-rw-r--r-- | internal/cli/workers/command.go | 143 | ||||
-rw-r--r-- | internal/cli/workers/command_test.go | 49 | ||||
-rw-r--r-- | internal/cli/workers/render.go | 135 |
3 files changed, 327 insertions, 0 deletions
diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go new file mode 100644 index 00000000..283887e4 --- /dev/null +++ b/internal/cli/workers/command.go @@ -0,0 +1,143 @@ +package workers + +import ( + "fmt" + "net/rpc" + "os" + "os/signal" + "syscall" + "time" + + "github.com/roadrunner-server/api/v2/plugins/jobs" + internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc" + + tm "github.com/buger/goterm" + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/spiral/errors" + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/spiral/roadrunner-plugins/v2/informer" +) + +// NewCommand creates `workers` command. +func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen + var ( + // interactive workers updates + interactive bool + ) + + cmd := &cobra.Command{ + Use: "workers", + Short: "Show information about active RoadRunner workers", + RunE: func(_ *cobra.Command, args []string) error { + const ( + op = errors.Op("handle_workers_command") + informerList = "informer.List" + ) + + client, err := internalRpc.NewClient(cfgPlugin) + if err != nil { + return err + } + + defer func() { _ = client.Close() }() + + plugins := args // by default we expect plugins list from user + if len(plugins) == 0 { // but if nothing was passed - request all informers list + if err = client.Call(informerList, true, &plugins); err != nil { + return err + } + } + + if !interactive { + return showWorkers(plugins, client) + } + + oss := make(chan os.Signal, 1) + signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + tm.Clear() + + tt := time.NewTicker(time.Second) + defer tt.Stop() + + for { + select { + case <-oss: + return nil + + case <-tt.C: + tm.MoveCursor(1, 1) + tm.Flush() + + if err = showWorkers(plugins, client); err != nil { + return errors.E(op, err) + } + } + } + }, + } + + cmd.Flags().BoolVarP( + &interactive, + "interactive", + "i", + false, + "render interactive workers table", + ) + + return cmd +} + +func showWorkers(plugins []string, client *rpc.Client) error { + const ( + op = errors.Op("show_workers") + informerWorkers = "informer.Workers" + informerJobs = "informer.Jobs" + // this is only one exception to Render the workers, service plugin has the same workers as other plugins, + // but they are RAW processes and needs to be handled in a different way. We don't need a special RPC call, but + // need a special render method. + servicePluginName = "service" + ) + + for _, plugin := range plugins { + list := &informer.WorkerList{} + + if err := client.Call(informerWorkers, plugin, &list); err != nil { + return errors.E(op, err) + } + + if len(list.Workers) == 0 { + continue + } + + if plugin == servicePluginName { + fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) + ServiceWorkerTable(os.Stdout, list.Workers).Render() + + continue + } + + fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) + + WorkerTable(os.Stdout, list.Workers).Render() + } + + for _, plugin := range plugins { + var jst []*jobs.State + + if err := client.Call(informerJobs, plugin, &jst); err != nil { + return errors.E(op, err) + } + + // eq to nil + if len(jst) == 0 { + continue + } + + fmt.Printf("Jobs of [%s]:\n", color.HiYellowString(plugin)) + JobsTable(os.Stdout, jst).Render() + } + + return nil +} diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go new file mode 100644 index 00000000..e593686d --- /dev/null +++ b/internal/cli/workers/command_test.go @@ -0,0 +1,49 @@ +package workers_test + +import ( + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/cli/workers" + + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestCommandProperties(t *testing.T) { + cmd := workers.NewCommand(&config.Plugin{}) + + assert.Equal(t, "workers", cmd.Use) + assert.NotNil(t, cmd.RunE) +} + +func TestCommandFlags(t *testing.T) { + cmd := workers.NewCommand(&config.Plugin{}) + + cases := []struct { + giveName string + wantShorthand string + wantDefault string + }{ + {giveName: "interactive", wantShorthand: "i", wantDefault: "false"}, + } + + for _, tt := range cases { + tt := tt + t.Run(tt.giveName, func(t *testing.T) { + flag := cmd.Flag(tt.giveName) + + if flag == nil { + assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName) + + return + } + + assert.Equal(t, tt.wantShorthand, flag.Shorthand) + assert.Equal(t, tt.wantDefault, flag.DefValue) + }) + } +} + +func TestExecution(t *testing.T) { + t.Skip("Command execution is not implemented yet") +} diff --git a/internal/cli/workers/render.go b/internal/cli/workers/render.go new file mode 100644 index 00000000..0bdf09b6 --- /dev/null +++ b/internal/cli/workers/render.go @@ -0,0 +1,135 @@ +package workers + +import ( + "io" + "strconv" + "time" + + "github.com/dustin/go-humanize" + "github.com/fatih/color" + "github.com/olekukonko/tablewriter" + "github.com/roadrunner-server/api/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/state/process" +) + +const ( + Ready string = "READY" + Paused string = "PAUSED/STOPPED" +) + +// WorkerTable renders table with information about rr server workers. +func WorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 9) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 7) + tw.SetColMinWidth(4, 7) + tw.SetColMinWidth(5, 18) + + for i := 0; i < len(workers); i++ { + tw.Append([]string{ + strconv.Itoa(workers[i].Pid), + renderStatus(workers[i].Status), + renderJobs(workers[i].NumJobs), + humanize.Bytes(workers[i].MemoryUsage), + renderCPU(workers[i].CPUPercent), + renderAlive(time.Unix(0, workers[i].Created)), + }) + } + + return tw +} + +// ServiceWorkerTable renders table with information about rr server workers. +func ServiceWorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetAutoWrapText(false) + tw.SetHeader([]string{"PID", "Memory", "CPU%", "Command"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 7) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 18) + tw.SetAlignment(tablewriter.ALIGN_LEFT) + + for i := 0; i < len(workers); i++ { + tw.Append([]string{ + strconv.Itoa(workers[i].Pid), + humanize.Bytes(workers[i].MemoryUsage), + renderCPU(workers[i].CPUPercent), + workers[i].Command, + }) + } + + return tw +} + +// JobsTable renders table with information about rr server jobs. +func JobsTable(writer io.Writer, jobs []*jobs.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetAutoWrapText(false) + tw.SetHeader([]string{"Status", "Pipeline", "Driver", "Queue", "Active", "Delayed", "Reserved"}) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetColWidth(7) + tw.SetColWidth(15) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetAlignment(tablewriter.ALIGN_LEFT) + + for i := 0; i < len(jobs); i++ { + tw.Append([]string{ + renderReady(jobs[i].Ready), + jobs[i].Pipeline, + jobs[i].Driver, + jobs[i].Queue, + strconv.Itoa(int(jobs[i].Active)), + strconv.Itoa(int(jobs[i].Delayed)), + strconv.Itoa(int(jobs[i].Reserved)), + }) + } + + return tw +} + +func renderReady(ready bool) string { + if ready { + return Ready + } + + return Paused +} + +//go:inline +func renderCPU(cpu float64) string { + return strconv.FormatFloat(cpu, 'f', 2, 64) +} + +func renderStatus(status string) string { + switch status { + case "inactive": + return color.YellowString("inactive") + case "ready": + return color.CyanString("ready") + case "working": + return color.GreenString("working") + case "invalid": + return color.YellowString("invalid") + case "stopped": + return color.RedString("stopped") + case "errored": + return color.RedString("errored") + default: + return status + } +} + +func renderJobs(number uint64) string { + return humanize.Comma(int64(number)) +} + +func renderAlive(t time.Time) string { + return humanize.RelTime(t, time.Now(), "ago", "") +} |