diff options
Diffstat (limited to 'internal')
-rw-r--r-- | internal/cli/reset/command.go | 104 | ||||
-rw-r--r-- | internal/cli/reset/command_test.go | 21 | ||||
-rw-r--r-- | internal/cli/root.go | 98 | ||||
-rw-r--r-- | internal/cli/root_test.go | 85 | ||||
-rw-r--r-- | internal/cli/serve/command.go | 104 | ||||
-rw-r--r-- | internal/cli/serve/command_test.go | 21 | ||||
-rw-r--r-- | internal/cli/workers/command.go | 143 | ||||
-rw-r--r-- | internal/cli/workers/command_test.go | 49 | ||||
-rw-r--r-- | internal/cli/workers/render.go | 135 | ||||
-rw-r--r-- | internal/container/config.go | 83 | ||||
-rw-r--r-- | internal/container/config_test.go | 82 | ||||
-rw-r--r-- | internal/container/container.go | 21 | ||||
-rw-r--r-- | internal/container/container_test.go | 27 | ||||
-rw-r--r-- | internal/container/plugins.go | 104 | ||||
-rw-r--r-- | internal/container/plugins_test.go | 20 | ||||
-rw-r--r-- | internal/debug/server.go | 37 | ||||
-rw-r--r-- | internal/debug/server_test.go | 57 | ||||
-rw-r--r-- | internal/meta/meta.go | 23 | ||||
-rw-r--r-- | internal/meta/meta_test.go | 49 | ||||
-rwxr-xr-x | internal/protocol.go | 111 | ||||
-rw-r--r-- | internal/rpc/client.go | 33 | ||||
-rw-r--r-- | internal/rpc/client_test.go | 60 |
22 files changed, 1356 insertions, 111 deletions
diff --git a/internal/cli/reset/command.go b/internal/cli/reset/command.go new file mode 100644 index 00000000..d6cf7087 --- /dev/null +++ b/internal/cli/reset/command.go @@ -0,0 +1,104 @@ +package reset + +import ( + "fmt" + "sync" + + internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc" + + "github.com/fatih/color" + "github.com/mattn/go-runewidth" + "github.com/spf13/cobra" + "github.com/spiral/errors" + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/vbauerster/mpb/v5" + "github.com/vbauerster/mpb/v5/decor" +) + +var spinnerStyle = []string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"} //nolint:gochecknoglobals + +// NewCommand creates `reset` command. +func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen + return &cobra.Command{ + Use: "reset", + Short: "Reset workers of all or specific RoadRunner service", + RunE: func(_ *cobra.Command, args []string) error { + const ( + op = errors.Op("reset_handler") + resetterList = "resetter.List" + resetterReset = "resetter.Reset" + ) + + client, err := internalRpc.NewClient(cfgPlugin) + if err != nil { + return err + } + + defer func() { _ = client.Close() }() + + services := args // by default we expect services list from user + if len(services) == 0 { // but if nothing was passed - request all services list + if err = client.Call(resetterList, true, &services); err != nil { + return err + } + } + + var wg sync.WaitGroup + wg.Add(len(services)) + + pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6)) //nolint:gomnd + + for _, service := range services { + var ( + bar *mpb.Bar + name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27) + result = make(chan interface{}) + ) + + bar = pr.AddSpinner( + 1, + mpb.SpinnerOnMiddle, + mpb.SpinnerStyle(spinnerStyle), + mpb.PrependDecorators(decor.Name(name)), + mpb.AppendDecorators(onComplete(result)), + ) + + // simulating some work + go func(service string, result chan interface{}) { + defer wg.Done() + defer bar.Increment() + + var done bool + <-client.Go(resetterReset, service, &done, nil).Done + + if err != nil { + result <- errors.E(op, err) + + return + } + + result <- nil + }(service, result) + } + + pr.Wait() + + return nil + }, + } +} + +func onComplete(result chan interface{}) decor.Decorator { + return decor.Any(func(s decor.Statistics) string { + select { + case r := <-result: + if err, ok := r.(error); ok { + return color.HiRedString(err.Error()) + } + + return color.HiGreenString("done") + default: + return "" + } + }) +} diff --git a/internal/cli/reset/command_test.go b/internal/cli/reset/command_test.go new file mode 100644 index 00000000..00cd046e --- /dev/null +++ b/internal/cli/reset/command_test.go @@ -0,0 +1,21 @@ +package reset_test + +import ( + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/cli/reset" + + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestCommandProperties(t *testing.T) { + cmd := reset.NewCommand(&config.Plugin{}) + + assert.Equal(t, "reset", cmd.Use) + assert.NotNil(t, cmd.RunE) +} + +func TestExecution(t *testing.T) { + t.Skip("Command execution is not implemented yet") +} diff --git a/internal/cli/root.go b/internal/cli/root.go new file mode 100644 index 00000000..8572bdc6 --- /dev/null +++ b/internal/cli/root.go @@ -0,0 +1,98 @@ +package cli + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + + "github.com/spiral/roadrunner-binary/v2/internal/cli/reset" + "github.com/spiral/roadrunner-binary/v2/internal/cli/serve" + "github.com/spiral/roadrunner-binary/v2/internal/cli/workers" + dbg "github.com/spiral/roadrunner-binary/v2/internal/debug" + "github.com/spiral/roadrunner-binary/v2/internal/meta" + + "github.com/joho/godotenv" + "github.com/spf13/cobra" + "github.com/spiral/roadrunner-plugins/v2/config" +) + +// NewCommand creates root command. +func NewCommand(cmdName string) *cobra.Command { //nolint:funlen + const envDotenv = "DOTENV_PATH" // env var name: path to the .env file + + var ( // flag values + cfgFile string // path to the .rr.yaml + workDir string // working directory + dotenv string // path to the .env file + debug bool // debug mode + override []string // override config values + ) + + var configPlugin = &config.Plugin{} // will be overwritten on pre-run action + + cmd := &cobra.Command{ + Use: cmdName, + Short: "High-performance PHP application server, load-balancer and process manager", + SilenceErrors: true, + SilenceUsage: true, + Version: fmt.Sprintf("%s (build time: %s, %s)", meta.Version(), meta.BuildTime(), runtime.Version()), + PersistentPreRunE: func(*cobra.Command, []string) error { + if cfgFile != "" { + if absPath, err := filepath.Abs(cfgFile); err == nil { + cfgFile = absPath // switch config path to the absolute + + // force working absPath related to config file + if err = os.Chdir(filepath.Dir(absPath)); err != nil { + return err + } + } + } + + if workDir != "" { + if err := os.Chdir(workDir); err != nil { + return err + } + } + + if v, ok := os.LookupEnv(envDotenv); ok { // read path to the dotenv file from environment variable + dotenv = v + } + + if dotenv != "" { + _ = godotenv.Load(dotenv) // error ignored because dotenv is optional feature + } + + cfg := &config.Plugin{Path: cfgFile, Prefix: "rr", Flags: override} + if err := cfg.Init(); err != nil { + return err + } + + if debug { + srv := dbg.NewServer() + go func() { _ = srv.Start(":6061") }() // TODO implement graceful server stopping + } + + // overwrite + *configPlugin = *cfg + + return nil + }, + } + + f := cmd.PersistentFlags() + + f.StringVarP(&cfgFile, "config", "c", ".rr.yaml", "config file") + f.StringVarP(&workDir, "WorkDir", "w", "", "working directory") // TODO change to `workDir`? + f.StringVarP(&dotenv, "dotenv", "", "", fmt.Sprintf("dotenv file [$%s]", envDotenv)) + f.BoolVarP(&debug, "debug", "d", false, "debug mode") + f.StringArrayVarP(&override, "override", "o", nil, "override config value (dot.notation=value)") + + cmd.AddCommand( + workers.NewCommand(configPlugin), + reset.NewCommand(configPlugin), + serve.NewCommand(configPlugin), + ) + + return cmd +} diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go new file mode 100644 index 00000000..59af9294 --- /dev/null +++ b/internal/cli/root_test.go @@ -0,0 +1,85 @@ +package cli_test + +import ( + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/cli" + + "github.com/spf13/cobra" + "github.com/stretchr/testify/assert" +) + +func TestCommandSubcommands(t *testing.T) { + cmd := cli.NewCommand("unit test") + + cases := []struct { + giveName string + }{ + {giveName: "workers"}, + {giveName: "reset"}, + {giveName: "serve"}, + } + + // get all existing subcommands and put into the map + subcommands := make(map[string]*cobra.Command) + for _, sub := range cmd.Commands() { + subcommands[sub.Name()] = sub + } + + for _, tt := range cases { + tt := tt + t.Run(tt.giveName, func(t *testing.T) { + if _, exists := subcommands[tt.giveName]; !exists { + assert.Failf(t, "command not found", "command [%s] was not found", tt.giveName) + } + }) + } +} + +func TestCommandFlags(t *testing.T) { + cmd := cli.NewCommand("unit test") + + cases := []struct { + giveName string + wantShorthand string + wantDefault string + }{ + {giveName: "config", wantShorthand: "c", wantDefault: ".rr.yaml"}, + {giveName: "WorkDir", wantShorthand: "w", wantDefault: ""}, + {giveName: "dotenv", wantShorthand: "", wantDefault: ""}, + {giveName: "debug", wantShorthand: "d", wantDefault: "false"}, + {giveName: "override", wantShorthand: "o", wantDefault: "[]"}, + } + + for _, tt := range cases { + tt := tt + t.Run(tt.giveName, func(t *testing.T) { + flag := cmd.Flag(tt.giveName) + + if flag == nil { + assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName) + + return + } + + assert.Equal(t, tt.wantShorthand, flag.Shorthand) + assert.Equal(t, tt.wantDefault, flag.DefValue) + }) + } +} + +func TestCommandSimpleExecuting(t *testing.T) { + cmd := cli.NewCommand("unit test") + cmd.SetArgs([]string{"-c", "./../../.rr.yaml"}) + + var executed bool + + if cmd.Run == nil { // override "Run" property for test (if it was not set) + cmd.Run = func(cmd *cobra.Command, args []string) { + executed = true + } + } + + assert.NoError(t, cmd.Execute()) + assert.True(t, executed) +} diff --git a/internal/cli/serve/command.go b/internal/cli/serve/command.go new file mode 100644 index 00000000..6679d795 --- /dev/null +++ b/internal/cli/serve/command.go @@ -0,0 +1,104 @@ +package serve + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/spiral/roadrunner-binary/v2/internal/container" + "github.com/spiral/roadrunner-binary/v2/internal/meta" + + "github.com/spf13/cobra" + "github.com/spiral/errors" + configImpl "github.com/spiral/roadrunner-plugins/v2/config" +) + +// NewCommand creates `serve` command. +func NewCommand(cfgPlugin *configImpl.Plugin) *cobra.Command { //nolint:funlen + return &cobra.Command{ + Use: "serve", + Short: "Start RoadRunner server", + RunE: func(*cobra.Command, []string) error { + const op = errors.Op("handle_serve_command") + + // create endure container config + containerCfg, err := container.NewConfig(cfgPlugin) + if err != nil { + return errors.E(op, err) + } + + // set the grace period which would be same for all the plugins + cfgPlugin.Timeout = containerCfg.GracePeriod + cfgPlugin.Version = meta.Version() + + // create endure container + endureContainer, err := container.NewContainer(*containerCfg) + if err != nil { + return errors.E(op, err) + } + + // register config plugin + if err = endureContainer.Register(cfgPlugin); err != nil { + return errors.E(op, err) + } + + // register another container plugins + for i, plugins := 0, container.Plugins(); i < len(plugins); i++ { + if err = endureContainer.Register(plugins[i]); err != nil { + return errors.E(op, err) + } + } + + // init container and all services + if err = endureContainer.Init(); err != nil { + return errors.E(op, err) + } + + // start serving the graph + errCh, err := endureContainer.Serve() + if err != nil { + return errors.E(op, err) + } + + oss, stop := make(chan os.Signal, 2), make(chan struct{}, 1) //nolint:gomnd + signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + go func() { + // first catch - stop the container + <-oss + // send signal to stop execution + stop <- struct{}{} + + // after first hit we are waiting for the second + // second catch - exit from the process + <-oss + fmt.Println("exit forced") + os.Exit(1) + }() + + fmt.Printf("[INFO] RoadRunner server started; version: %s, buildtime: %s\n", meta.Version(), meta.BuildTime()) + + for { + select { + case e := <-errCh: + fmt.Printf("error occurred: %v, plugin: %s\n", e.Error, e.VertexID) + + // return error, container already stopped internally + if !containerCfg.RetryOnFail { + return errors.E(op, e.Error) + } + + case <-stop: // stop the container after first signal + fmt.Printf("stop signal received, grace timeout is: %d seconds\n", uint64(containerCfg.GracePeriod.Seconds())) + + if err = endureContainer.Stop(); err != nil { + fmt.Printf("error occurred during the stopping container: %v\n", err) + } + + return nil + } + } + }, + } +} diff --git a/internal/cli/serve/command_test.go b/internal/cli/serve/command_test.go new file mode 100644 index 00000000..0e61ce83 --- /dev/null +++ b/internal/cli/serve/command_test.go @@ -0,0 +1,21 @@ +package serve_test + +import ( + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/cli/serve" + + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestCommandProperties(t *testing.T) { + cmd := serve.NewCommand(&config.Plugin{}) + + assert.Equal(t, "serve", cmd.Use) + assert.NotNil(t, cmd.RunE) +} + +func TestExecution(t *testing.T) { + t.Skip("Command execution is not implemented yet") +} diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go new file mode 100644 index 00000000..283887e4 --- /dev/null +++ b/internal/cli/workers/command.go @@ -0,0 +1,143 @@ +package workers + +import ( + "fmt" + "net/rpc" + "os" + "os/signal" + "syscall" + "time" + + "github.com/roadrunner-server/api/v2/plugins/jobs" + internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc" + + tm "github.com/buger/goterm" + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/spiral/errors" + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/spiral/roadrunner-plugins/v2/informer" +) + +// NewCommand creates `workers` command. +func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen + var ( + // interactive workers updates + interactive bool + ) + + cmd := &cobra.Command{ + Use: "workers", + Short: "Show information about active RoadRunner workers", + RunE: func(_ *cobra.Command, args []string) error { + const ( + op = errors.Op("handle_workers_command") + informerList = "informer.List" + ) + + client, err := internalRpc.NewClient(cfgPlugin) + if err != nil { + return err + } + + defer func() { _ = client.Close() }() + + plugins := args // by default we expect plugins list from user + if len(plugins) == 0 { // but if nothing was passed - request all informers list + if err = client.Call(informerList, true, &plugins); err != nil { + return err + } + } + + if !interactive { + return showWorkers(plugins, client) + } + + oss := make(chan os.Signal, 1) + signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + tm.Clear() + + tt := time.NewTicker(time.Second) + defer tt.Stop() + + for { + select { + case <-oss: + return nil + + case <-tt.C: + tm.MoveCursor(1, 1) + tm.Flush() + + if err = showWorkers(plugins, client); err != nil { + return errors.E(op, err) + } + } + } + }, + } + + cmd.Flags().BoolVarP( + &interactive, + "interactive", + "i", + false, + "render interactive workers table", + ) + + return cmd +} + +func showWorkers(plugins []string, client *rpc.Client) error { + const ( + op = errors.Op("show_workers") + informerWorkers = "informer.Workers" + informerJobs = "informer.Jobs" + // this is only one exception to Render the workers, service plugin has the same workers as other plugins, + // but they are RAW processes and needs to be handled in a different way. We don't need a special RPC call, but + // need a special render method. + servicePluginName = "service" + ) + + for _, plugin := range plugins { + list := &informer.WorkerList{} + + if err := client.Call(informerWorkers, plugin, &list); err != nil { + return errors.E(op, err) + } + + if len(list.Workers) == 0 { + continue + } + + if plugin == servicePluginName { + fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) + ServiceWorkerTable(os.Stdout, list.Workers).Render() + + continue + } + + fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin)) + + WorkerTable(os.Stdout, list.Workers).Render() + } + + for _, plugin := range plugins { + var jst []*jobs.State + + if err := client.Call(informerJobs, plugin, &jst); err != nil { + return errors.E(op, err) + } + + // eq to nil + if len(jst) == 0 { + continue + } + + fmt.Printf("Jobs of [%s]:\n", color.HiYellowString(plugin)) + JobsTable(os.Stdout, jst).Render() + } + + return nil +} diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go new file mode 100644 index 00000000..e593686d --- /dev/null +++ b/internal/cli/workers/command_test.go @@ -0,0 +1,49 @@ +package workers_test + +import ( + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/cli/workers" + + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestCommandProperties(t *testing.T) { + cmd := workers.NewCommand(&config.Plugin{}) + + assert.Equal(t, "workers", cmd.Use) + assert.NotNil(t, cmd.RunE) +} + +func TestCommandFlags(t *testing.T) { + cmd := workers.NewCommand(&config.Plugin{}) + + cases := []struct { + giveName string + wantShorthand string + wantDefault string + }{ + {giveName: "interactive", wantShorthand: "i", wantDefault: "false"}, + } + + for _, tt := range cases { + tt := tt + t.Run(tt.giveName, func(t *testing.T) { + flag := cmd.Flag(tt.giveName) + + if flag == nil { + assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName) + + return + } + + assert.Equal(t, tt.wantShorthand, flag.Shorthand) + assert.Equal(t, tt.wantDefault, flag.DefValue) + }) + } +} + +func TestExecution(t *testing.T) { + t.Skip("Command execution is not implemented yet") +} diff --git a/internal/cli/workers/render.go b/internal/cli/workers/render.go new file mode 100644 index 00000000..0bdf09b6 --- /dev/null +++ b/internal/cli/workers/render.go @@ -0,0 +1,135 @@ +package workers + +import ( + "io" + "strconv" + "time" + + "github.com/dustin/go-humanize" + "github.com/fatih/color" + "github.com/olekukonko/tablewriter" + "github.com/roadrunner-server/api/v2/plugins/jobs" + "github.com/spiral/roadrunner/v2/state/process" +) + +const ( + Ready string = "READY" + Paused string = "PAUSED/STOPPED" +) + +// WorkerTable renders table with information about rr server workers. +func WorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 9) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 7) + tw.SetColMinWidth(4, 7) + tw.SetColMinWidth(5, 18) + + for i := 0; i < len(workers); i++ { + tw.Append([]string{ + strconv.Itoa(workers[i].Pid), + renderStatus(workers[i].Status), + renderJobs(workers[i].NumJobs), + humanize.Bytes(workers[i].MemoryUsage), + renderCPU(workers[i].CPUPercent), + renderAlive(time.Unix(0, workers[i].Created)), + }) + } + + return tw +} + +// ServiceWorkerTable renders table with information about rr server workers. +func ServiceWorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetAutoWrapText(false) + tw.SetHeader([]string{"PID", "Memory", "CPU%", "Command"}) + tw.SetColMinWidth(0, 7) + tw.SetColMinWidth(1, 7) + tw.SetColMinWidth(2, 7) + tw.SetColMinWidth(3, 18) + tw.SetAlignment(tablewriter.ALIGN_LEFT) + + for i := 0; i < len(workers); i++ { + tw.Append([]string{ + strconv.Itoa(workers[i].Pid), + humanize.Bytes(workers[i].MemoryUsage), + renderCPU(workers[i].CPUPercent), + workers[i].Command, + }) + } + + return tw +} + +// JobsTable renders table with information about rr server jobs. +func JobsTable(writer io.Writer, jobs []*jobs.State) *tablewriter.Table { + tw := tablewriter.NewWriter(writer) + tw.SetAutoWrapText(false) + tw.SetHeader([]string{"Status", "Pipeline", "Driver", "Queue", "Active", "Delayed", "Reserved"}) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetColWidth(7) + tw.SetColWidth(15) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetColWidth(10) + tw.SetAlignment(tablewriter.ALIGN_LEFT) + + for i := 0; i < len(jobs); i++ { + tw.Append([]string{ + renderReady(jobs[i].Ready), + jobs[i].Pipeline, + jobs[i].Driver, + jobs[i].Queue, + strconv.Itoa(int(jobs[i].Active)), + strconv.Itoa(int(jobs[i].Delayed)), + strconv.Itoa(int(jobs[i].Reserved)), + }) + } + + return tw +} + +func renderReady(ready bool) string { + if ready { + return Ready + } + + return Paused +} + +//go:inline +func renderCPU(cpu float64) string { + return strconv.FormatFloat(cpu, 'f', 2, 64) +} + +func renderStatus(status string) string { + switch status { + case "inactive": + return color.YellowString("inactive") + case "ready": + return color.CyanString("ready") + case "working": + return color.GreenString("working") + case "invalid": + return color.YellowString("invalid") + case "stopped": + return color.RedString("stopped") + case "errored": + return color.RedString("errored") + default: + return status + } +} + +func renderJobs(number uint64) string { + return humanize.Comma(int64(number)) +} + +func renderAlive(t time.Time) string { + return humanize.RelTime(t, time.Now(), "ago", "") +} diff --git a/internal/container/config.go b/internal/container/config.go new file mode 100644 index 00000000..54e2bb5b --- /dev/null +++ b/internal/container/config.go @@ -0,0 +1,83 @@ +package container + +import ( + "fmt" + "time" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner-plugins/v2/config" +) + +type Config struct { + GracePeriod time.Duration + PrintGraph bool + RetryOnFail bool // TODO check for races, disabled at this moment + LogLevel endure.Level +} + +const ( + endureKey = "endure" + defaultGracePeriod = time.Second * 30 +) + +// NewConfig creates endure container configuration. +func NewConfig(cfgPlugin *config.Plugin) (*Config, error) { + if !cfgPlugin.Has(endureKey) { + return &Config{ // return config with defaults + GracePeriod: defaultGracePeriod, + PrintGraph: false, + RetryOnFail: false, + LogLevel: endure.ErrorLevel, + }, nil + } + + rrCfgEndure := struct { + GracePeriod time.Duration `mapstructure:"grace_period"` + PrintGraph bool `mapstructure:"print_graph"` + RetryOnFail bool `mapstructure:"retry_on_fail"` + LogLevel string `mapstructure:"log_level"` + }{} + + if err := cfgPlugin.UnmarshalKey(endureKey, &rrCfgEndure); err != nil { + return nil, err + } + + if rrCfgEndure.GracePeriod == 0 { + rrCfgEndure.GracePeriod = defaultGracePeriod + } + + if rrCfgEndure.LogLevel == "" { + rrCfgEndure.LogLevel = "error" + } + + logLevel, err := parseLogLevel(rrCfgEndure.LogLevel) + if err != nil { + return nil, err + } + + return &Config{ + GracePeriod: rrCfgEndure.GracePeriod, + PrintGraph: rrCfgEndure.PrintGraph, + RetryOnFail: rrCfgEndure.RetryOnFail, + LogLevel: logLevel, + }, nil +} + +func parseLogLevel(s string) (endure.Level, error) { + switch s { + case "debug": + return endure.DebugLevel, nil + case "info": + return endure.InfoLevel, nil + case "warn", "warning": + return endure.WarnLevel, nil + case "error": + return endure.ErrorLevel, nil + case "panic": + return endure.PanicLevel, nil + case "fatal": + return endure.FatalLevel, nil + } + + return endure.DebugLevel, fmt.Errorf(`unknown log level "%s" (allowed: debug, info, warn, error, panic, fatal)`, s) +} diff --git a/internal/container/config_test.go b/internal/container/config_test.go new file mode 100644 index 00000000..9919def4 --- /dev/null +++ b/internal/container/config_test.go @@ -0,0 +1,82 @@ +package container_test + +import ( + "testing" + "time" + + "github.com/spiral/roadrunner-binary/v2/internal/container" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestNewConfig_SuccessfulReading(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte(` +endure: + grace_period: 10s + print_graph: true + retry_on_fail: true + log_level: warn +`)} + assert.NoError(t, cfgPlugin.Init()) + + c, err := container.NewConfig(cfgPlugin) + assert.NoError(t, err) + assert.NotNil(t, c) + + assert.Equal(t, time.Second*10, c.GracePeriod) + assert.True(t, c.PrintGraph) + assert.True(t, c.RetryOnFail) + assert.Equal(t, endure.WarnLevel, c.LogLevel) +} + +func TestNewConfig_WithoutEndureKey(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}} + assert.NoError(t, cfgPlugin.Init()) + + c, err := container.NewConfig(cfgPlugin) + assert.NoError(t, err) + assert.NotNil(t, c) + + assert.Equal(t, time.Second*30, c.GracePeriod) + assert.False(t, c.PrintGraph) + assert.False(t, c.RetryOnFail) + assert.Equal(t, endure.ErrorLevel, c.LogLevel) +} + +func TestNewConfig_LoggingLevels(t *testing.T) { + for _, tt := range []struct { + giveLevel string + wantLevel endure.Level + wantError bool + }{ + {giveLevel: "debug", wantLevel: endure.DebugLevel}, + {giveLevel: "info", wantLevel: endure.InfoLevel}, + {giveLevel: "warn", wantLevel: endure.WarnLevel}, + {giveLevel: "warning", wantLevel: endure.WarnLevel}, + {giveLevel: "error", wantLevel: endure.ErrorLevel}, + {giveLevel: "panic", wantLevel: endure.PanicLevel}, + {giveLevel: "fatal", wantLevel: endure.FatalLevel}, + + {giveLevel: "foobar", wantError: true}, + } { + tt := tt + t.Run(tt.giveLevel, func(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("endure:\n log_level: " + tt.giveLevel)} + assert.NoError(t, cfgPlugin.Init()) + + c, err := container.NewConfig(cfgPlugin) + + if tt.wantError { + assert.Nil(t, c) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unknown log level") + } else { + assert.NoError(t, err) + assert.NotNil(t, c) + assert.Equal(t, tt.wantLevel, c.LogLevel) + } + }) + } +} diff --git a/internal/container/container.go b/internal/container/container.go new file mode 100644 index 00000000..aa767b2e --- /dev/null +++ b/internal/container/container.go @@ -0,0 +1,21 @@ +package container + +import ( + endure "github.com/spiral/endure/pkg/container" +) + +// NewContainer creates endure container with all required options (based on container Config). Logger is nil by +// default. +func NewContainer(cfg Config) (*endure.Endure, error) { + endureOptions := []endure.Options{ + endure.SetLogLevel(cfg.LogLevel), + endure.RetryOnFail(cfg.RetryOnFail), + endure.GracefulShutdownTimeout(cfg.GracePeriod), + } + + if cfg.PrintGraph { + endureOptions = append(endureOptions, endure.Visualize(endure.StdOut, "")) + } + + return endure.NewContainer(nil, endureOptions...) +} diff --git a/internal/container/container_test.go b/internal/container/container_test.go new file mode 100644 index 00000000..c6d613a0 --- /dev/null +++ b/internal/container/container_test.go @@ -0,0 +1,27 @@ +package container_test + +import ( + "testing" + "time" + + "github.com/spiral/roadrunner-binary/v2/internal/container" + + endure "github.com/spiral/endure/pkg/container" + "github.com/stretchr/testify/assert" +) + +func TestNewContainer(t *testing.T) { // there is no legal way to test container options + c, err := container.NewContainer(container.Config{}) + c2, err2 := container.NewContainer(container.Config{ + GracePeriod: time.Second, + PrintGraph: true, + RetryOnFail: true, + LogLevel: endure.WarnLevel, + }) + + assert.NoError(t, err) + assert.NotNil(t, c) + + assert.NoError(t, err2) + assert.NotNil(t, c2) +} diff --git a/internal/container/plugins.go b/internal/container/plugins.go new file mode 100644 index 00000000..6c962793 --- /dev/null +++ b/internal/container/plugins.go @@ -0,0 +1,104 @@ +package container + +import ( + "github.com/spiral/roadrunner-plugins/v2/amqp" + "github.com/spiral/roadrunner-plugins/v2/beanstalk" + "github.com/spiral/roadrunner-plugins/v2/boltdb" + "github.com/spiral/roadrunner-plugins/v2/broadcast" + "github.com/spiral/roadrunner-plugins/v2/fileserver" + grpcPlugin "github.com/spiral/roadrunner-plugins/v2/grpc" + httpPlugin "github.com/spiral/roadrunner-plugins/v2/http" + "github.com/spiral/roadrunner-plugins/v2/http/middleware/gzip" + "github.com/spiral/roadrunner-plugins/v2/http/middleware/headers" + newrelic "github.com/spiral/roadrunner-plugins/v2/http/middleware/new_relic" + "github.com/spiral/roadrunner-plugins/v2/http/middleware/prometheus" + "github.com/spiral/roadrunner-plugins/v2/http/middleware/static" + "github.com/spiral/roadrunner-plugins/v2/http/middleware/websockets" + "github.com/spiral/roadrunner-plugins/v2/informer" + "github.com/spiral/roadrunner-plugins/v2/jobs" + "github.com/spiral/roadrunner-plugins/v2/kv" + "github.com/spiral/roadrunner-plugins/v2/logger" + "github.com/spiral/roadrunner-plugins/v2/memcached" + "github.com/spiral/roadrunner-plugins/v2/memory" + "github.com/spiral/roadrunner-plugins/v2/metrics" + "github.com/spiral/roadrunner-plugins/v2/nats" + "github.com/spiral/roadrunner-plugins/v2/redis" + "github.com/spiral/roadrunner-plugins/v2/reload" + "github.com/spiral/roadrunner-plugins/v2/resetter" + rpcPlugin "github.com/spiral/roadrunner-plugins/v2/rpc" + "github.com/spiral/roadrunner-plugins/v2/server" + "github.com/spiral/roadrunner-plugins/v2/service" + "github.com/spiral/roadrunner-plugins/v2/sqs" + "github.com/spiral/roadrunner-plugins/v2/status" + "github.com/spiral/roadrunner-plugins/v2/tcp" + roadrunner_temporal "github.com/temporalio/roadrunner-temporal" +) + +// Plugins returns active plugins for the endure container. Feel free to add or remove any plugins. +func Plugins() []interface{} { //nolint:funlen + return []interface{}{ + // bundled + // informer plugin (./rr workers, ./rr workers -i) + &informer.Plugin{}, + // resetter plugin (./rr reset) + &resetter.Plugin{}, + + // logger plugin + &logger.ZapLogger{}, + // metrics plugin + &metrics.Plugin{}, + // reload plugin + &reload.Plugin{}, + // rpc plugin (workers, reset) + &rpcPlugin.Plugin{}, + // server plugin (NewWorker, NewWorkerPool) + &server.Plugin{}, + // service plugin + &service.Plugin{}, + + // ========= JOBS bundle + &jobs.Plugin{}, + &amqp.Plugin{}, + &sqs.Plugin{}, + &nats.Plugin{}, + &beanstalk.Plugin{}, + // ========= + + // http server plugin with middleware + &httpPlugin.Plugin{}, + &newrelic.Plugin{}, + &static.Plugin{}, + &headers.Plugin{}, + &status.Plugin{}, + &gzip.Plugin{}, + &prometheus.Plugin{}, + + &fileserver.Plugin{}, + // =================== + + &grpcPlugin.Plugin{}, + // kv + ws + jobs plugin + &memory.Plugin{}, + // KV + Jobs + &boltdb.Plugin{}, + + // broadcast via memory or redis + // used in conjunction with Websockets, memory and redis plugins + &broadcast.Plugin{}, + // ======== websockets broadcast bundle + &websockets.Plugin{}, + &redis.Plugin{}, + // ========= + + // ============== KV + &kv.Plugin{}, + &memcached.Plugin{}, + // ============== + + // raw TCP connections handling + &tcp.Plugin{}, + + // temporal plugins + &roadrunner_temporal.Plugin{}, + } +} diff --git a/internal/container/plugins_test.go b/internal/container/plugins_test.go new file mode 100644 index 00000000..da639f7d --- /dev/null +++ b/internal/container/plugins_test.go @@ -0,0 +1,20 @@ +package container_test + +import ( + "reflect" + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/container" +) + +func TestPlugins(t *testing.T) { + for _, p := range container.Plugins() { + if p == nil { + t.Error("plugin cannot be nil") + } + + if pk := reflect.TypeOf(p).Kind(); pk != reflect.Ptr && pk != reflect.Struct { + t.Errorf("plugin %v must be a structure or pointer to the structure", p) + } + } +} diff --git a/internal/debug/server.go b/internal/debug/server.go new file mode 100644 index 00000000..c07a4549 --- /dev/null +++ b/internal/debug/server.go @@ -0,0 +1,37 @@ +package debug + +import ( + "context" + "net/http" + "net/http/pprof" +) + +// Server is a HTTP server for debugging. +type Server struct { + srv *http.Server +} + +// NewServer creates new HTTP server for debugging. +func NewServer() Server { + mux := http.NewServeMux() + + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + return Server{srv: &http.Server{Handler: mux}} +} + +// Start debug server. +func (s *Server) Start(addr string) error { + s.srv.Addr = addr + + return s.srv.ListenAndServe() +} + +// Stop debug server. +func (s *Server) Stop(ctx context.Context) error { + return s.srv.Shutdown(ctx) +} diff --git a/internal/debug/server_test.go b/internal/debug/server_test.go new file mode 100644 index 00000000..d2e1f9f0 --- /dev/null +++ b/internal/debug/server_test.go @@ -0,0 +1,57 @@ +package debug_test + +import ( + "context" + "math/rand" + "net" + "net/http" + "strconv" + "testing" + "time" + + "github.com/spiral/roadrunner-binary/v2/internal/debug" + + "github.com/stretchr/testify/assert" +) + +func TestServer_StartingAndStopping(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + var ( + s = debug.NewServer() + port = strconv.Itoa(rand.Intn(10000) + 10000) //nolint:gosec + ) + + go func() { assert.ErrorIs(t, s.Start(":"+port), http.ErrServerClosed) }() + + defer func() { assert.NoError(t, s.Stop(context.Background())) }() + + for i := 0; i < 100; i++ { // wait for server started state + if l, err := net.Dial("tcp", ":"+port); err != nil { + <-time.After(time.Millisecond) + } else { + _ = l.Close() + + break + } + } + + for _, uri := range []string{ // assert that pprof handlers exists + "http://127.0.0.1:" + port + "/debug/pprof/", + "http://127.0.0.1:" + port + "/debug/pprof/cmdline", + // "http://127.0.0.1:" + port + "/debug/pprof/profile", + "http://127.0.0.1:" + port + "/debug/pprof/symbol", + // "http://127.0.0.1:" + port + "/debug/pprof/trace", + } { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + req, _ := http.NewRequestWithContext(ctx, http.MethodHead, uri, http.NoBody) + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + _ = resp.Body.Close() + + cancel() + } +} diff --git a/internal/meta/meta.go b/internal/meta/meta.go new file mode 100644 index 00000000..0c5a0556 --- /dev/null +++ b/internal/meta/meta.go @@ -0,0 +1,23 @@ +package meta + +import "strings" + +// next variables will be set during compilation (do NOT rename them). +var ( + version = "local" + buildTime = "development" //nolint:gochecknoglobals +) + +// Version returns version value (without `v` prefix). +func Version() string { + v := strings.TrimSpace(version) + + if len(v) > 1 && ((v[0] == 'v' || v[0] == 'V') && (v[1] >= '0' && v[1] <= '9')) { + return v[1:] + } + + return v +} + +// BuildTime returns application building time. +func BuildTime() string { return buildTime } diff --git a/internal/meta/meta_test.go b/internal/meta/meta_test.go new file mode 100644 index 00000000..32dee122 --- /dev/null +++ b/internal/meta/meta_test.go @@ -0,0 +1,49 @@ +package meta + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestVersion(t *testing.T) { + for give, want := range map[string]string{ + // without changes + "vvv": "vvv", + "victory": "victory", + "voodoo": "voodoo", + "foo": "foo", + "0.0.0": "0.0.0", + "v": "v", + "V": "V", + + // "v" prefix removal + "v0.0.0": "0.0.0", + "V0.0.0": "0.0.0", + "v1": "1", + "V1": "1", + + // with spaces + " 0.0.0": "0.0.0", + "v0.0.0 ": "0.0.0", + " V0.0.0": "0.0.0", + "v1 ": "1", + " V1": "1", + "v ": "v", + } { + version = give + + assert.Equal(t, want, Version()) + } +} + +func TestBuildTime(t *testing.T) { + for give, want := range map[string]string{ + "development": "development", + "2021-03-26T13:50:31+0500": "2021-03-26T13:50:31+0500", + } { + buildTime = give + + assert.Equal(t, want, BuildTime()) + } +} diff --git a/internal/protocol.go b/internal/protocol.go deleted file mode 100755 index cefd685d..00000000 --- a/internal/protocol.go +++ /dev/null @@ -1,111 +0,0 @@ -package internal - -import ( - "os" - "sync" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/goridge/v3/pkg/frame" - "github.com/spiral/goridge/v3/pkg/relay" -) - -type StopCommand struct { - Stop bool `json:"stop"` -} - -type pidCommand struct { - Pid int `json:"pid"` -} - -var fPool = sync.Pool{New: func() interface{} { - return frame.NewFrame() -}} - -func getFrame() *frame.Frame { - return fPool.Get().(*frame.Frame) -} - -func putFrame(f *frame.Frame) { - f.Reset() - fPool.Put(f) -} - -func SendControl(rl relay.Relay, payload interface{}) error { - fr := getFrame() - defer putFrame(fr) - - fr.WriteVersion(fr.Header(), frame.VERSION_1) - fr.WriteFlags(fr.Header(), frame.CONTROL, frame.CODEC_JSON) - - if data, ok := payload.([]byte); ok { - // check if payload no more that 4Gb - if uint32(len(data)) > ^uint32(0) { - return errors.Str("payload is more that 4gb") - } - - fr.WritePayloadLen(fr.Header(), uint32(len(data))) - fr.WritePayload(data) - fr.WriteCRC(fr.Header()) - - err := rl.Send(fr) - if err != nil { - return err - } - return nil - } - - data, err := json.Marshal(payload) - if err != nil { - return errors.Errorf("invalid payload: %s", err) - } - - fr.WritePayloadLen(fr.Header(), uint32(len(data))) - fr.WritePayload(data) - fr.WriteCRC(fr.Header()) - - // we don't need a copy here, because frame copy the data before send - err = rl.Send(fr) - if err != nil { - return errors.E(errors.FileNotFound, err) - } - - return nil -} - -func Pid(rl relay.Relay) (int64, error) { - err := SendControl(rl, pidCommand{Pid: os.Getpid()}) - if err != nil { - return 0, err - } - - fr := getFrame() - defer putFrame(fr) - - err = rl.Receive(fr) - if err != nil { - return 0, err - } - - if fr == nil { - return 0, errors.Str("nil frame received") - } - - flags := fr.ReadFlags() - - if flags&frame.CONTROL == 0 { - return 0, errors.Str("unexpected response, header is missing, no CONTROL flag") - } - - link := &pidCommand{} - err = json.Unmarshal(fr.Payload(), link) - if err != nil { - return 0, err - } - - if link.Pid <= 0 { - return 0, errors.Str("pid should be greater than 0") - } - - return int64(link.Pid), nil -} diff --git a/internal/rpc/client.go b/internal/rpc/client.go new file mode 100644 index 00000000..f371a51c --- /dev/null +++ b/internal/rpc/client.go @@ -0,0 +1,33 @@ +// Package prc contains wrapper around RPC client ONLY for internal usage. +package rpc + +import ( + "net/rpc" + + "github.com/spiral/errors" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + "github.com/spiral/roadrunner-plugins/v2/config" + rpcPlugin "github.com/spiral/roadrunner-plugins/v2/rpc" +) + +// NewClient creates client ONLY for internal usage (communication between our application with RR side). +// Client will be connected to the RPC. +func NewClient(cfgPlugin *config.Plugin) (*rpc.Client, error) { + if !cfgPlugin.Has(rpcPlugin.PluginName) { + return nil, errors.E("rpc service disabled") + } + + rpcConfig := &rpcPlugin.Config{} + if err := cfgPlugin.UnmarshalKey(rpcPlugin.PluginName, rpcConfig); err != nil { + return nil, err + } + + rpcConfig.InitDefaults() + + conn, err := rpcConfig.Dialer() + if err != nil { + return nil, err + } + + return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil +} diff --git a/internal/rpc/client_test.go b/internal/rpc/client_test.go new file mode 100644 index 00000000..b39788a2 --- /dev/null +++ b/internal/rpc/client_test.go @@ -0,0 +1,60 @@ +package rpc_test + +import ( + "net" + "testing" + + "github.com/spiral/roadrunner-binary/v2/internal/rpc" + + "github.com/spiral/roadrunner-plugins/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestNewClient_RpcServiceDisabled(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}} + assert.NoError(t, cfgPlugin.Init()) + + c, err := rpc.NewClient(cfgPlugin) + + assert.Nil(t, c) + assert.EqualError(t, err, "rpc service disabled") +} + +func TestNewClient_WrongRcpConfiguration(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n $foo bar")} + assert.NoError(t, cfgPlugin.Init()) + + c, err := rpc.NewClient(cfgPlugin) + + assert.Nil(t, c) + assert.Error(t, err) + assert.Contains(t, err.Error(), "config_plugin_unmarshal_key") +} + +func TestNewClient_ConnectionError(t *testing.T) { + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n listen: tcp://127.0.0.1:0")} + assert.NoError(t, cfgPlugin.Init()) + + c, err := rpc.NewClient(cfgPlugin) + + assert.Nil(t, c) + assert.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") +} + +func TestNewClient_SuccessfullyConnected(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + assert.NoError(t, err) + + defer func() { assert.NoError(t, l.Close()) }() + + cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n listen: tcp://" + l.Addr().String())} + assert.NoError(t, cfgPlugin.Init()) + + c, err := rpc.NewClient(cfgPlugin) + + assert.NotNil(t, c) + assert.NoError(t, err) + + defer func() { assert.NoError(t, c.Close()) }() +} |