diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/cli/reset.go | 104 | ||||
-rw-r--r-- | cmd/cli/root.go | 97 | ||||
-rw-r--r-- | cmd/cli/serve.go | 63 | ||||
-rw-r--r-- | cmd/cli/workers.go | 118 | ||||
-rw-r--r-- | cmd/main.go | 51 |
5 files changed, 433 insertions, 0 deletions
diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go new file mode 100644 index 00000000..03b470e5 --- /dev/null +++ b/cmd/cli/reset.go @@ -0,0 +1,104 @@ +package cli + +import ( + "fmt" + "sync" + + "github.com/fatih/color" + "github.com/mattn/go-runewidth" + "github.com/spf13/cobra" + "github.com/spiral/errors" + "github.com/vbauerster/mpb/v5" + "github.com/vbauerster/mpb/v5/decor" +) + +const List string = "resetter.List" +const Reset string = "resetter.Reset" + +func init() { + root.AddCommand(&cobra.Command{ + Use: "reset", + Short: "Reset workers of all or specific RoadRunner service", + RunE: resetHandler, + }) +} + +func resetHandler(cmd *cobra.Command, args []string) error { + const op = errors.Op("reset handler") + client, err := RPCClient() + if err != nil { + return err + } + defer func() { + _ = client.Close() + }() + + var services []string + if len(args) != 0 { + services = args + } else { + err = client.Call(List, true, &services) + if err != nil { + return errors.E(op, err) + } + } + + var wg sync.WaitGroup + pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6)) + wg.Add(len(services)) + + for _, service := range services { + var ( + bar *mpb.Bar + name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27) + result = make(chan interface{}) + ) + + bar = pr.AddSpinner( + 1, + mpb.SpinnerOnMiddle, + mpb.SpinnerStyle([]string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"}), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators(onComplete(result)), + ) + + // simulating some work + go func(service string, result chan interface{}) { + defer wg.Done() + defer bar.Increment() + + var done bool + err = client.Call(Reset, service, &done) + if err != nil { + result <- errors.E(op, err) + return + } + result <- nil + }(service, result) + } + + pr.Wait() + return nil +} + +func onComplete(result chan interface{}) decor.Decorator { + var ( + msg = "" + fn = func(s decor.Statistics) string { + select { + case r := <-result: + if err, ok := r.(error); ok { + msg = color.HiRedString(err.Error()) + return msg + } + + msg = color.HiGreenString("done") + return msg + default: + return msg + } + } + ) + + return decor.Any(fn) +} diff --git a/cmd/cli/root.go b/cmd/cli/root.go new file mode 100644 index 00000000..febe410b --- /dev/null +++ b/cmd/cli/root.go @@ -0,0 +1,97 @@ +package cli + +import ( + "log" + "net/rpc" + "os" + "path/filepath" + + "github.com/spiral/errors" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + rpcPlugin "github.com/spiral/roadrunner-plugins/rpc" + + "github.com/spiral/roadrunner-plugins/config" + + "github.com/spf13/cobra" + "github.com/spiral/endure" +) + +var ( + WorkDir string + CfgFile string + Container *endure.Endure + cfg *config.Viper + root = &cobra.Command{ + Use: "rr", + SilenceErrors: true, + SilenceUsage: true, + } +) + +func Execute() { + if err := root.Execute(); err != nil { + // exit with error, fatal invoke os.Exit(1) + log.Fatal(err) + } +} + +func init() { + root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)") + root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory") + + cobra.OnInitialize(func() { + if CfgFile != "" { + if absPath, err := filepath.Abs(CfgFile); err == nil { + CfgFile = absPath + + // force working absPath related to config file + if err := os.Chdir(filepath.Dir(absPath)); err != nil { + panic(err) + } + } + } + + if WorkDir != "" { + if err := os.Chdir(WorkDir); err != nil { + panic(err) + } + } + + cfg = &config.Viper{} + cfg.Path = CfgFile + cfg.Prefix = "rr" + + // register config + err := Container.Register(cfg) + if err != nil { + panic(err) + } + }) +} + +// RPCClient is using to make a requests to the ./rr reset, ./rr workers +func RPCClient() (*rpc.Client, error) { + rpcConfig := &rpcPlugin.Config{} + + err := cfg.Init() + if err != nil { + return nil, err + } + + if !cfg.Has(rpcPlugin.PluginName) { + return nil, errors.E("rpc service disabled") + } + + err = cfg.UnmarshalKey(rpcPlugin.PluginName, rpcConfig) + if err != nil { + return nil, err + } + rpcConfig.InitDefaults() + + conn, err := rpcConfig.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil +} diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go new file mode 100644 index 00000000..2fe54932 --- /dev/null +++ b/cmd/cli/serve.go @@ -0,0 +1,63 @@ +package cli + +import ( + "log" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + "github.com/spiral/errors" + "go.uber.org/multierr" +) + +func init() { + root.AddCommand(&cobra.Command{ + Use: "serve", + Short: "Start RoadRunner server", + RunE: handler, + }) +} + +func handler(cmd *cobra.Command, args []string) error { + const op = errors.Op("handle serve command") + /* + We need to have path to the config at the RegisterTarget stage + But after cobra.Execute, because cobra fills up cli variables on this stage + */ + + err := Container.Init() + if err != nil { + return errors.E(op, err) + } + + errCh, err := Container.Serve() + if err != nil { + return errors.E(op, err) + } + + // https://golang.org/pkg/os/signal/#Notify + // should be of buffer size at least 1 + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + for { + select { + case e := <-errCh: + err = multierr.Append(err, e.Error) + log.Printf("error occurred: %v, service: %s", e.Error.Error(), e.VertexID) + er := Container.Stop() + if er != nil { + err = multierr.Append(err, er) + return errors.E(op, err) + } + return errors.E(op, err) + case <-c: + err = Container.Stop() + if err != nil { + return errors.E(op, err) + } + return nil + } + } +} diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go new file mode 100644 index 00000000..d34de40a --- /dev/null +++ b/cmd/cli/workers.go @@ -0,0 +1,118 @@ +package cli + +import ( + "fmt" + "log" + "net/rpc" + "os" + "os/signal" + "syscall" + "time" + + tm "github.com/buger/goterm" + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/spiral/errors" + "github.com/spiral/roadrunner-plugins/informer" + "github.com/spiral/roadrunner/v2/tools" +) + +var ( + interactive bool +) + +const InformerList string = "informer.List" + +func init() { + workersCommand := &cobra.Command{ + Use: "workers", + Short: "Show information about active roadrunner workers", + RunE: workersHandler, + } + + workersCommand.Flags().BoolVarP( + &interactive, + "interactive", + "i", + false, + "render interactive workers table", + ) + + root.AddCommand(workersCommand) +} + +func workersHandler(cmd *cobra.Command, args []string) error { + const op = errors.Op("workers handler") + // get RPC client + client, err := RPCClient() + if err != nil { + return err + } + defer func() { + err := client.Close() + if err != nil { + log.Printf("error when closing RPCClient: error %v", err) + } + }() + + var plugins []string + // assume user wants to show workers from particular plugin + if len(args) != 0 { + plugins = args + } else { + err = client.Call(InformerList, true, &plugins) + if err != nil { + return errors.E(op, err) + } + } + + if !interactive { + return showWorkers(plugins, client) + } + + // https://golang.org/pkg/os/signal/#Notify + // should be of buffer size at least 1 + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + tm.Clear() + tt := time.NewTicker(time.Second) + defer tt.Stop() + for { + select { + case <-c: + return nil + case <-tt.C: + tm.MoveCursor(1, 1) + err := showWorkers(plugins, client) + if err != nil { + return errors.E(op, err) + } + tm.Flush() + } + } +} + +func showWorkers(plugins []string, client *rpc.Client) error { + for _, plugin := range plugins { + list := &informer.WorkerList{} + err := client.Call("informer.Workers", plugin, &list) + if err != nil { + return err + } + + // it's a golang :) + ps := make([]tools.ProcessState, len(list.Workers)) + for i := 0; i < len(list.Workers); i++ { + ps[i].Created = list.Workers[i].Created + ps[i].NumJobs = list.Workers[i].NumJobs + ps[i].MemoryUsage = list.Workers[i].MemoryUsage + ps[i].Pid = list.Workers[i].Pid + ps[i].Status = list.Workers[i].Status + } + + fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) + tools.WorkerTable(os.Stdout, ps).Render() + } + return nil +} diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 00000000..8151d4fe --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "log" + + "github.com/spiral/endure" + "github.com/spiral/roadrunner-plugins/http" + "github.com/spiral/roadrunner-plugins/informer" + "github.com/spiral/roadrunner-plugins/logger" + "github.com/spiral/roadrunner-plugins/metrics" + "github.com/spiral/roadrunner-plugins/redis" + "github.com/spiral/roadrunner-plugins/reload" + "github.com/spiral/roadrunner-plugins/resetter" + "github.com/spiral/roadrunner-plugins/rpc" + "github.com/spiral/roadrunner-plugins/server" + "github.com/spiral/roadrunner/v2/cmd/cli" +) + +func main() { + var err error + cli.Container, err = endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel), endure.RetryOnFail(false)) + if err != nil { + log.Fatal(err) + } + + err = cli.Container.RegisterAll( + // logger plugin + &logger.ZapLogger{}, + // metrics plugin + &metrics.Plugin{}, + // redis plugin (internal) + &redis.Plugin{}, + // http server plugin + &http.Plugin{}, + // reload plugin + &reload.Plugin{}, + // informer plugin (./rr workers) + &informer.Plugin{}, + // resetter plugin (./rr reset) + &resetter.Plugin{}, + // rpc plugin (workers, reset) + &rpc.Plugin{}, + // server plugin (NewWorker, NewWorkerPool) + &server.Plugin{}, + ) + if err != nil { + log.Fatal(err) + } + + cli.Execute() +} |