summaryrefslogtreecommitdiff
path: root/cmd/cli
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-22 23:02:25 +0300
committerValery Piashchynski <[email protected]>2020-12-22 23:02:25 +0300
commitfd1e98bc6339abfa66523bf9d2208d00df8ee4bc (patch)
treeb679441276717e687a5b460ebeba7ad0eee69be9 /cmd/cli
parent40b6c3169931a3fef62b649db19ff01dc685b7d4 (diff)
events listeners refactor, CLI initial commit
Diffstat (limited to 'cmd/cli')
-rw-r--r--cmd/cli/reset.go97
-rw-r--r--cmd/cli/root.go138
-rw-r--r--cmd/cli/serve.go62
-rw-r--r--cmd/cli/workers.go108
4 files changed, 405 insertions, 0 deletions
diff --git a/cmd/cli/reset.go b/cmd/cli/reset.go
new file mode 100644
index 00000000..82cf8590
--- /dev/null
+++ b/cmd/cli/reset.go
@@ -0,0 +1,97 @@
+package cli
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/fatih/color"
+ "github.com/mattn/go-runewidth"
+ "github.com/spf13/cobra"
+ "github.com/vbauerster/mpb/v5"
+ "github.com/vbauerster/mpb/v5/decor"
+)
+
+func init() {
+ root.AddCommand(&cobra.Command{
+ Use: "reset",
+ Short: "Reset workers of all or specific RoadRunner service",
+ RunE: resetHandler,
+ })
+}
+
+func resetHandler(cmd *cobra.Command, args []string) error {
+ client, err := RPCClient()
+ if err != nil {
+ return err
+ }
+ defer client.Close()
+
+ var services []string
+ if len(args) != 0 {
+ services = args
+ } else {
+ err = client.Call("resetter.List", true, &services)
+ if err != nil {
+ return err
+ }
+ }
+
+ var wg sync.WaitGroup
+ pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6))
+ wg.Add(len(services))
+
+ for _, service := range services {
+ var (
+ bar *mpb.Bar
+ name = runewidth.FillRight(fmt.Sprintf("Reset [%s]", color.HiYellowString(service)), 27)
+ result = make(chan interface{})
+ )
+
+ bar = pr.AddSpinner(
+ 1,
+ mpb.SpinnerOnMiddle,
+ mpb.SpinnerStyle([]string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"}),
+ mpb.PrependDecorators(decor.Name(name)),
+ mpb.AppendDecorators(onComplete(result)),
+ )
+
+ // simulating some work
+ go func(service string, result chan interface{}) {
+ defer wg.Done()
+ defer bar.Increment()
+
+ var done bool
+ err = client.Call("resetter.Reset", service, &done)
+ if err != nil {
+ result <- err
+ return
+ }
+ result <- nil
+ }(service, result)
+ }
+
+ pr.Wait()
+ return nil
+}
+
+func onComplete(result chan interface{}) decor.Decorator {
+ var (
+ msg = ""
+ fn = func(s decor.Statistics) string {
+ select {
+ case r := <-result:
+ if err, ok := r.(error); ok {
+ msg = color.HiRedString(err.Error())
+ return msg
+ }
+
+ msg = color.HiGreenString("done")
+ return msg
+ default:
+ return msg
+ }
+ }
+ )
+
+ return decor.Any(fn)
+}
diff --git a/cmd/cli/root.go b/cmd/cli/root.go
new file mode 100644
index 00000000..b070ca5e
--- /dev/null
+++ b/cmd/cli/root.go
@@ -0,0 +1,138 @@
+package cli
+
+import (
+ "log"
+ "net/rpc"
+ "os"
+ "path/filepath"
+
+ "github.com/spiral/errors"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner-plugins/logger"
+ rpcPlugin "github.com/spiral/roadrunner-plugins/rpc"
+
+ "github.com/spiral/roadrunner-plugins/config"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+
+ "github.com/spf13/cobra"
+ "github.com/spiral/endure"
+)
+
+var (
+ WorkDir string
+ CfgFile string
+ Container *endure.Endure
+ Logger *zap.Logger
+ cfg *config.Viper
+ root = &cobra.Command{
+ Use: "rr",
+ SilenceErrors: true,
+ SilenceUsage: true,
+ }
+)
+
+func Execute() {
+ if err := root.Execute(); err != nil {
+ // exit with error, fatal invoke os.Exit(1)
+ log.Fatal(err)
+ }
+}
+
+func init() {
+ root.PersistentFlags().StringVarP(&CfgFile, "config", "c", ".rr.yaml", "config file (default is .rr.yaml)")
+ root.PersistentFlags().StringVarP(&WorkDir, "WorkDir", "w", "", "work directory")
+
+ // todo: properly handle debug level
+ Logger = initLogger()
+ //endureLogger := logger.NewZapAdapter(Logger)
+
+ cobra.OnInitialize(func() {
+ if CfgFile != "" {
+ if absPath, err := filepath.Abs(CfgFile); err == nil {
+ CfgFile = absPath
+
+ // force working absPath related to config file
+ if err := os.Chdir(filepath.Dir(absPath)); err != nil {
+ panic(err)
+ }
+ }
+ }
+
+ if WorkDir != "" {
+ if err := os.Chdir(WorkDir); err != nil {
+ panic(err)
+ }
+ }
+
+ // todo: config is global, not only for serve
+ cfg = &config.Viper{}
+ cfg.Path = CfgFile
+ cfg.Prefix = "rr"
+
+ err := Container.Register(cfg)
+ if err != nil {
+ panic(err)
+ }
+
+ err = Container.Register(&logger.ZapLogger{})
+ if err != nil {
+ panic(err)
+ }
+ })
+}
+
+// todo: improve
+func RPCClient() (*rpc.Client, error) {
+ rpcConfig := &rpcPlugin.Config{}
+
+ err := cfg.Init()
+ if err != nil {
+ return nil, err
+ }
+
+ if !cfg.Has(rpcPlugin.PluginName) {
+ return nil, errors.E("rpc service disabled")
+ }
+
+ err = cfg.UnmarshalKey(rpcPlugin.PluginName, rpcConfig)
+ if err != nil {
+ return nil, err
+ }
+ rpcConfig.InitDefaults()
+
+ conn, err := rpcConfig.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
+}
+
+func initLogger() *zap.Logger {
+ // todo: we do not need it
+ cfg := zap.Config{
+ Level: zap.NewAtomicLevelAt(zap.ErrorLevel),
+ Encoding: "console",
+ EncoderConfig: zapcore.EncoderConfig{
+ MessageKey: "message",
+ LevelKey: "level",
+ TimeKey: "time",
+ CallerKey: "caller",
+ NameKey: "name",
+ StacktraceKey: "stack",
+ EncodeLevel: zapcore.CapitalLevelEncoder,
+ EncodeTime: zapcore.ISO8601TimeEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ },
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
+ }
+
+ l, err := cfg.Build(zap.AddCaller())
+ if err != nil {
+ panic(err)
+ }
+
+ return l
+}
diff --git a/cmd/cli/serve.go b/cmd/cli/serve.go
new file mode 100644
index 00000000..ace239fc
--- /dev/null
+++ b/cmd/cli/serve.go
@@ -0,0 +1,62 @@
+package cli
+
+import (
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/spiral/errors"
+ "go.uber.org/zap"
+
+ "github.com/spf13/cobra"
+)
+
+func init() {
+ root.AddCommand(&cobra.Command{
+ Use: "serve",
+ Short: "Start RoadRunner Temporal service(s)",
+ RunE: handler,
+ })
+}
+
+func handler(cmd *cobra.Command, args []string) error {
+ const op = errors.Op("handle serve command")
+ /*
+ We need to have path to the config at the RegisterTarget stage
+ But after cobra.Execute, because cobra fills up cli variables on this stage
+ */
+ err := Container.Init()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ errCh, err := Container.Serve()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // https://golang.org/pkg/os/signal/#Notify
+ // should be of buffer size at least 1
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
+
+ for {
+ select {
+ case e := <-errCh:
+ Logger.Error(e.Error.Error(), zap.String("service", e.VertexID))
+ er := Container.Stop()
+ if er != nil {
+ Logger.Error(e.Error.Error(), zap.String("service", e.VertexID))
+ if er != nil {
+ return errors.E(op, er)
+ }
+ }
+ case <-c:
+ err = Container.Stop()
+ if err != nil {
+ return errors.E(op, err)
+ }
+ return nil
+ }
+ }
+}
diff --git a/cmd/cli/workers.go b/cmd/cli/workers.go
new file mode 100644
index 00000000..e031ac6c
--- /dev/null
+++ b/cmd/cli/workers.go
@@ -0,0 +1,108 @@
+package cli
+
+import (
+ "fmt"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ tm "github.com/buger/goterm"
+ "github.com/spiral/roadrunner/v2/tools"
+
+ "github.com/fatih/color"
+ "github.com/spf13/cobra"
+ "github.com/spiral/roadrunner-plugins/informer"
+)
+
+var (
+ interactive bool
+ stopSignal = make(chan os.Signal, 1)
+)
+
+func init() {
+ workersCommand := &cobra.Command{
+ Use: "workers",
+ Short: "Show information about active roadrunner workers",
+ RunE: workersHandler,
+ }
+
+ workersCommand.Flags().BoolVarP(
+ &interactive,
+ "interactive",
+ "i",
+ false,
+ "render interactive workers table",
+ )
+
+ root.AddCommand(workersCommand)
+
+ signal.Notify(stopSignal, syscall.SIGTERM)
+ signal.Notify(stopSignal, syscall.SIGINT)
+}
+
+func workersHandler(cmd *cobra.Command, args []string) error {
+ client, err := RPCClient()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ err := client.Close()
+ if err != nil {
+ Logger.Error(err.Error())
+ }
+ }()
+
+ var plugins []string
+ if len(args) != 0 {
+ plugins = args
+ } else {
+ err = client.Call("informer.List", true, &plugins)
+ if err != nil {
+ return err
+ }
+ }
+
+ if !interactive {
+ return showWorkers(plugins, client)
+ }
+
+ tm.Clear()
+ for {
+ select {
+ case <-stopSignal:
+ return nil
+ case <-time.NewTicker(time.Second).C:
+ tm.MoveCursor(1, 1)
+ showWorkers(plugins, client)
+ tm.Flush()
+ }
+ }
+}
+
+func showWorkers(plugins []string, client *rpc.Client) error {
+ for _, plugin := range plugins {
+ list := &informer.WorkerList{}
+ err := client.Call("informer.Workers", plugin, &list)
+ if err != nil {
+ return err
+ }
+
+ // it's a golang :)
+ ps := make([]tools.ProcessState, len(list.Workers))
+ for i := 0; i < len(list.Workers); i++ {
+ ps[i].Created = list.Workers[i].Created
+ ps[i].NumJobs = list.Workers[i].NumJobs
+ ps[i].MemoryUsage = list.Workers[i].MemoryUsage
+ ps[i].Pid = list.Workers[i].Pid
+ ps[i].Status = list.Workers[i].Status
+
+ }
+
+ fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
+ tools.WorkerTable(os.Stdout, ps).Render()
+ }
+
+ return nil
+}