summaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2022-01-15 12:08:20 +0300
committerValery Piashchynski <[email protected]>2022-01-15 12:08:20 +0300
commit5254c8eb27311e2a8a53a4c90c3829cf1238c563 (patch)
treeb51c9a4c1dd4c25adc511498ce0380a7078c5572 /internal
parent13609dd03dd0d2fa85b9fb850be787bf4e2ea67f (diff)
Repository content update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'internal')
-rw-r--r--internal/cli/reset/command.go104
-rw-r--r--internal/cli/reset/command_test.go21
-rw-r--r--internal/cli/root.go98
-rw-r--r--internal/cli/root_test.go85
-rw-r--r--internal/cli/serve/command.go104
-rw-r--r--internal/cli/serve/command_test.go21
-rw-r--r--internal/cli/workers/command.go143
-rw-r--r--internal/cli/workers/command_test.go49
-rw-r--r--internal/cli/workers/render.go135
-rw-r--r--internal/container/config.go83
-rw-r--r--internal/container/config_test.go82
-rw-r--r--internal/container/container.go21
-rw-r--r--internal/container/container_test.go27
-rw-r--r--internal/container/plugins.go104
-rw-r--r--internal/container/plugins_test.go20
-rw-r--r--internal/debug/server.go37
-rw-r--r--internal/debug/server_test.go57
-rw-r--r--internal/meta/meta.go23
-rw-r--r--internal/meta/meta_test.go49
-rwxr-xr-xinternal/protocol.go111
-rw-r--r--internal/rpc/client.go33
-rw-r--r--internal/rpc/client_test.go60
22 files changed, 1356 insertions, 111 deletions
diff --git a/internal/cli/reset/command.go b/internal/cli/reset/command.go
new file mode 100644
index 00000000..d6cf7087
--- /dev/null
+++ b/internal/cli/reset/command.go
@@ -0,0 +1,104 @@
+package reset
+
+import (
+ "fmt"
+ "sync"
+
+ internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc"
+
+ "github.com/fatih/color"
+ "github.com/mattn/go-runewidth"
+ "github.com/spf13/cobra"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/vbauerster/mpb/v5"
+ "github.com/vbauerster/mpb/v5/decor"
+)
+
+var spinnerStyle = []string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"} //nolint:gochecknoglobals
+
+// NewCommand creates `reset` command.
+func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen
+ return &cobra.Command{
+ Use: "reset",
+ Short: "Reset workers of all or specific RoadRunner service",
+ RunE: func(_ *cobra.Command, args []string) error {
+ const (
+ op = errors.Op("reset_handler")
+ resetterList = "resetter.List"
+ resetterReset = "resetter.Reset"
+ )
+
+ client, err := internalRpc.NewClient(cfgPlugin)
+ if err != nil {
+ return err
+ }
+
+ defer func() { _ = client.Close() }()
+
+ services := args // by default we expect services list from user
+ if len(services) == 0 { // but if nothing was passed - request all services list
+ if err = client.Call(resetterList, true, &services); err != nil {
+ return err
+ }
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(len(services))
+
+ pr := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(6)) //nolint:gomnd
+
+ for _, service := range services {
+ var (
+ bar *mpb.Bar
+ name = runewidth.FillRight(fmt.Sprintf("Resetting plugin: [%s]", color.HiYellowString(service)), 27)
+ result = make(chan interface{})
+ )
+
+ bar = pr.AddSpinner(
+ 1,
+ mpb.SpinnerOnMiddle,
+ mpb.SpinnerStyle(spinnerStyle),
+ mpb.PrependDecorators(decor.Name(name)),
+ mpb.AppendDecorators(onComplete(result)),
+ )
+
+ // simulating some work
+ go func(service string, result chan interface{}) {
+ defer wg.Done()
+ defer bar.Increment()
+
+ var done bool
+ <-client.Go(resetterReset, service, &done, nil).Done
+
+ if err != nil {
+ result <- errors.E(op, err)
+
+ return
+ }
+
+ result <- nil
+ }(service, result)
+ }
+
+ pr.Wait()
+
+ return nil
+ },
+ }
+}
+
+func onComplete(result chan interface{}) decor.Decorator {
+ return decor.Any(func(s decor.Statistics) string {
+ select {
+ case r := <-result:
+ if err, ok := r.(error); ok {
+ return color.HiRedString(err.Error())
+ }
+
+ return color.HiGreenString("done")
+ default:
+ return ""
+ }
+ })
+}
diff --git a/internal/cli/reset/command_test.go b/internal/cli/reset/command_test.go
new file mode 100644
index 00000000..00cd046e
--- /dev/null
+++ b/internal/cli/reset/command_test.go
@@ -0,0 +1,21 @@
+package reset_test
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/reset"
+
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCommandProperties(t *testing.T) {
+ cmd := reset.NewCommand(&config.Plugin{})
+
+ assert.Equal(t, "reset", cmd.Use)
+ assert.NotNil(t, cmd.RunE)
+}
+
+func TestExecution(t *testing.T) {
+ t.Skip("Command execution is not implemented yet")
+}
diff --git a/internal/cli/root.go b/internal/cli/root.go
new file mode 100644
index 00000000..8572bdc6
--- /dev/null
+++ b/internal/cli/root.go
@@ -0,0 +1,98 @@
+package cli
+
+import (
+ "fmt"
+ "os"
+ "path/filepath"
+ "runtime"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/reset"
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/serve"
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/workers"
+ dbg "github.com/spiral/roadrunner-binary/v2/internal/debug"
+ "github.com/spiral/roadrunner-binary/v2/internal/meta"
+
+ "github.com/joho/godotenv"
+ "github.com/spf13/cobra"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+)
+
+// NewCommand creates root command.
+func NewCommand(cmdName string) *cobra.Command { //nolint:funlen
+ const envDotenv = "DOTENV_PATH" // env var name: path to the .env file
+
+ var ( // flag values
+ cfgFile string // path to the .rr.yaml
+ workDir string // working directory
+ dotenv string // path to the .env file
+ debug bool // debug mode
+ override []string // override config values
+ )
+
+ var configPlugin = &config.Plugin{} // will be overwritten on pre-run action
+
+ cmd := &cobra.Command{
+ Use: cmdName,
+ Short: "High-performance PHP application server, load-balancer and process manager",
+ SilenceErrors: true,
+ SilenceUsage: true,
+ Version: fmt.Sprintf("%s (build time: %s, %s)", meta.Version(), meta.BuildTime(), runtime.Version()),
+ PersistentPreRunE: func(*cobra.Command, []string) error {
+ if cfgFile != "" {
+ if absPath, err := filepath.Abs(cfgFile); err == nil {
+ cfgFile = absPath // switch config path to the absolute
+
+ // force working absPath related to config file
+ if err = os.Chdir(filepath.Dir(absPath)); err != nil {
+ return err
+ }
+ }
+ }
+
+ if workDir != "" {
+ if err := os.Chdir(workDir); err != nil {
+ return err
+ }
+ }
+
+ if v, ok := os.LookupEnv(envDotenv); ok { // read path to the dotenv file from environment variable
+ dotenv = v
+ }
+
+ if dotenv != "" {
+ _ = godotenv.Load(dotenv) // error ignored because dotenv is optional feature
+ }
+
+ cfg := &config.Plugin{Path: cfgFile, Prefix: "rr", Flags: override}
+ if err := cfg.Init(); err != nil {
+ return err
+ }
+
+ if debug {
+ srv := dbg.NewServer()
+ go func() { _ = srv.Start(":6061") }() // TODO implement graceful server stopping
+ }
+
+ // overwrite
+ *configPlugin = *cfg
+
+ return nil
+ },
+ }
+
+ f := cmd.PersistentFlags()
+
+ f.StringVarP(&cfgFile, "config", "c", ".rr.yaml", "config file")
+ f.StringVarP(&workDir, "WorkDir", "w", "", "working directory") // TODO change to `workDir`?
+ f.StringVarP(&dotenv, "dotenv", "", "", fmt.Sprintf("dotenv file [$%s]", envDotenv))
+ f.BoolVarP(&debug, "debug", "d", false, "debug mode")
+ f.StringArrayVarP(&override, "override", "o", nil, "override config value (dot.notation=value)")
+
+ cmd.AddCommand(
+ workers.NewCommand(configPlugin),
+ reset.NewCommand(configPlugin),
+ serve.NewCommand(configPlugin),
+ )
+
+ return cmd
+}
diff --git a/internal/cli/root_test.go b/internal/cli/root_test.go
new file mode 100644
index 00000000..59af9294
--- /dev/null
+++ b/internal/cli/root_test.go
@@ -0,0 +1,85 @@
+package cli_test
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli"
+
+ "github.com/spf13/cobra"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCommandSubcommands(t *testing.T) {
+ cmd := cli.NewCommand("unit test")
+
+ cases := []struct {
+ giveName string
+ }{
+ {giveName: "workers"},
+ {giveName: "reset"},
+ {giveName: "serve"},
+ }
+
+ // get all existing subcommands and put into the map
+ subcommands := make(map[string]*cobra.Command)
+ for _, sub := range cmd.Commands() {
+ subcommands[sub.Name()] = sub
+ }
+
+ for _, tt := range cases {
+ tt := tt
+ t.Run(tt.giveName, func(t *testing.T) {
+ if _, exists := subcommands[tt.giveName]; !exists {
+ assert.Failf(t, "command not found", "command [%s] was not found", tt.giveName)
+ }
+ })
+ }
+}
+
+func TestCommandFlags(t *testing.T) {
+ cmd := cli.NewCommand("unit test")
+
+ cases := []struct {
+ giveName string
+ wantShorthand string
+ wantDefault string
+ }{
+ {giveName: "config", wantShorthand: "c", wantDefault: ".rr.yaml"},
+ {giveName: "WorkDir", wantShorthand: "w", wantDefault: ""},
+ {giveName: "dotenv", wantShorthand: "", wantDefault: ""},
+ {giveName: "debug", wantShorthand: "d", wantDefault: "false"},
+ {giveName: "override", wantShorthand: "o", wantDefault: "[]"},
+ }
+
+ for _, tt := range cases {
+ tt := tt
+ t.Run(tt.giveName, func(t *testing.T) {
+ flag := cmd.Flag(tt.giveName)
+
+ if flag == nil {
+ assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName)
+
+ return
+ }
+
+ assert.Equal(t, tt.wantShorthand, flag.Shorthand)
+ assert.Equal(t, tt.wantDefault, flag.DefValue)
+ })
+ }
+}
+
+func TestCommandSimpleExecuting(t *testing.T) {
+ cmd := cli.NewCommand("unit test")
+ cmd.SetArgs([]string{"-c", "./../../.rr.yaml"})
+
+ var executed bool
+
+ if cmd.Run == nil { // override "Run" property for test (if it was not set)
+ cmd.Run = func(cmd *cobra.Command, args []string) {
+ executed = true
+ }
+ }
+
+ assert.NoError(t, cmd.Execute())
+ assert.True(t, executed)
+}
diff --git a/internal/cli/serve/command.go b/internal/cli/serve/command.go
new file mode 100644
index 00000000..6679d795
--- /dev/null
+++ b/internal/cli/serve/command.go
@@ -0,0 +1,104 @@
+package serve
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/container"
+ "github.com/spiral/roadrunner-binary/v2/internal/meta"
+
+ "github.com/spf13/cobra"
+ "github.com/spiral/errors"
+ configImpl "github.com/spiral/roadrunner-plugins/v2/config"
+)
+
+// NewCommand creates `serve` command.
+func NewCommand(cfgPlugin *configImpl.Plugin) *cobra.Command { //nolint:funlen
+ return &cobra.Command{
+ Use: "serve",
+ Short: "Start RoadRunner server",
+ RunE: func(*cobra.Command, []string) error {
+ const op = errors.Op("handle_serve_command")
+
+ // create endure container config
+ containerCfg, err := container.NewConfig(cfgPlugin)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // set the grace period which would be same for all the plugins
+ cfgPlugin.Timeout = containerCfg.GracePeriod
+ cfgPlugin.Version = meta.Version()
+
+ // create endure container
+ endureContainer, err := container.NewContainer(*containerCfg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // register config plugin
+ if err = endureContainer.Register(cfgPlugin); err != nil {
+ return errors.E(op, err)
+ }
+
+ // register another container plugins
+ for i, plugins := 0, container.Plugins(); i < len(plugins); i++ {
+ if err = endureContainer.Register(plugins[i]); err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ // init container and all services
+ if err = endureContainer.Init(); err != nil {
+ return errors.E(op, err)
+ }
+
+ // start serving the graph
+ errCh, err := endureContainer.Serve()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ oss, stop := make(chan os.Signal, 2), make(chan struct{}, 1) //nolint:gomnd
+ signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
+
+ go func() {
+ // first catch - stop the container
+ <-oss
+ // send signal to stop execution
+ stop <- struct{}{}
+
+ // after first hit we are waiting for the second
+ // second catch - exit from the process
+ <-oss
+ fmt.Println("exit forced")
+ os.Exit(1)
+ }()
+
+ fmt.Printf("[INFO] RoadRunner server started; version: %s, buildtime: %s\n", meta.Version(), meta.BuildTime())
+
+ for {
+ select {
+ case e := <-errCh:
+ fmt.Printf("error occurred: %v, plugin: %s\n", e.Error, e.VertexID)
+
+ // return error, container already stopped internally
+ if !containerCfg.RetryOnFail {
+ return errors.E(op, e.Error)
+ }
+
+ case <-stop: // stop the container after first signal
+ fmt.Printf("stop signal received, grace timeout is: %d seconds\n", uint64(containerCfg.GracePeriod.Seconds()))
+
+ if err = endureContainer.Stop(); err != nil {
+ fmt.Printf("error occurred during the stopping container: %v\n", err)
+ }
+
+ return nil
+ }
+ }
+ },
+ }
+}
diff --git a/internal/cli/serve/command_test.go b/internal/cli/serve/command_test.go
new file mode 100644
index 00000000..0e61ce83
--- /dev/null
+++ b/internal/cli/serve/command_test.go
@@ -0,0 +1,21 @@
+package serve_test
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/serve"
+
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCommandProperties(t *testing.T) {
+ cmd := serve.NewCommand(&config.Plugin{})
+
+ assert.Equal(t, "serve", cmd.Use)
+ assert.NotNil(t, cmd.RunE)
+}
+
+func TestExecution(t *testing.T) {
+ t.Skip("Command execution is not implemented yet")
+}
diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go
new file mode 100644
index 00000000..283887e4
--- /dev/null
+++ b/internal/cli/workers/command.go
@@ -0,0 +1,143 @@
+package workers
+
+import (
+ "fmt"
+ "net/rpc"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "github.com/roadrunner-server/api/v2/plugins/jobs"
+ internalRpc "github.com/spiral/roadrunner-binary/v2/internal/rpc"
+
+ tm "github.com/buger/goterm"
+ "github.com/fatih/color"
+ "github.com/spf13/cobra"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/spiral/roadrunner-plugins/v2/informer"
+)
+
+// NewCommand creates `workers` command.
+func NewCommand(cfgPlugin *config.Plugin) *cobra.Command { //nolint:funlen
+ var (
+ // interactive workers updates
+ interactive bool
+ )
+
+ cmd := &cobra.Command{
+ Use: "workers",
+ Short: "Show information about active RoadRunner workers",
+ RunE: func(_ *cobra.Command, args []string) error {
+ const (
+ op = errors.Op("handle_workers_command")
+ informerList = "informer.List"
+ )
+
+ client, err := internalRpc.NewClient(cfgPlugin)
+ if err != nil {
+ return err
+ }
+
+ defer func() { _ = client.Close() }()
+
+ plugins := args // by default we expect plugins list from user
+ if len(plugins) == 0 { // but if nothing was passed - request all informers list
+ if err = client.Call(informerList, true, &plugins); err != nil {
+ return err
+ }
+ }
+
+ if !interactive {
+ return showWorkers(plugins, client)
+ }
+
+ oss := make(chan os.Signal, 1)
+ signal.Notify(oss, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
+
+ tm.Clear()
+
+ tt := time.NewTicker(time.Second)
+ defer tt.Stop()
+
+ for {
+ select {
+ case <-oss:
+ return nil
+
+ case <-tt.C:
+ tm.MoveCursor(1, 1)
+ tm.Flush()
+
+ if err = showWorkers(plugins, client); err != nil {
+ return errors.E(op, err)
+ }
+ }
+ }
+ },
+ }
+
+ cmd.Flags().BoolVarP(
+ &interactive,
+ "interactive",
+ "i",
+ false,
+ "render interactive workers table",
+ )
+
+ return cmd
+}
+
+func showWorkers(plugins []string, client *rpc.Client) error {
+ const (
+ op = errors.Op("show_workers")
+ informerWorkers = "informer.Workers"
+ informerJobs = "informer.Jobs"
+ // this is only one exception to Render the workers, service plugin has the same workers as other plugins,
+ // but they are RAW processes and needs to be handled in a different way. We don't need a special RPC call, but
+ // need a special render method.
+ servicePluginName = "service"
+ )
+
+ for _, plugin := range plugins {
+ list := &informer.WorkerList{}
+
+ if err := client.Call(informerWorkers, plugin, &list); err != nil {
+ return errors.E(op, err)
+ }
+
+ if len(list.Workers) == 0 {
+ continue
+ }
+
+ if plugin == servicePluginName {
+ fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
+ ServiceWorkerTable(os.Stdout, list.Workers).Render()
+
+ continue
+ }
+
+ fmt.Printf("Workers of [%s]:\n", color.HiYellowString(plugin))
+
+ WorkerTable(os.Stdout, list.Workers).Render()
+ }
+
+ for _, plugin := range plugins {
+ var jst []*jobs.State
+
+ if err := client.Call(informerJobs, plugin, &jst); err != nil {
+ return errors.E(op, err)
+ }
+
+ // eq to nil
+ if len(jst) == 0 {
+ continue
+ }
+
+ fmt.Printf("Jobs of [%s]:\n", color.HiYellowString(plugin))
+ JobsTable(os.Stdout, jst).Render()
+ }
+
+ return nil
+}
diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go
new file mode 100644
index 00000000..e593686d
--- /dev/null
+++ b/internal/cli/workers/command_test.go
@@ -0,0 +1,49 @@
+package workers_test
+
+import (
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/cli/workers"
+
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestCommandProperties(t *testing.T) {
+ cmd := workers.NewCommand(&config.Plugin{})
+
+ assert.Equal(t, "workers", cmd.Use)
+ assert.NotNil(t, cmd.RunE)
+}
+
+func TestCommandFlags(t *testing.T) {
+ cmd := workers.NewCommand(&config.Plugin{})
+
+ cases := []struct {
+ giveName string
+ wantShorthand string
+ wantDefault string
+ }{
+ {giveName: "interactive", wantShorthand: "i", wantDefault: "false"},
+ }
+
+ for _, tt := range cases {
+ tt := tt
+ t.Run(tt.giveName, func(t *testing.T) {
+ flag := cmd.Flag(tt.giveName)
+
+ if flag == nil {
+ assert.Failf(t, "flag not found", "flag [%s] was not found", tt.giveName)
+
+ return
+ }
+
+ assert.Equal(t, tt.wantShorthand, flag.Shorthand)
+ assert.Equal(t, tt.wantDefault, flag.DefValue)
+ })
+ }
+}
+
+func TestExecution(t *testing.T) {
+ t.Skip("Command execution is not implemented yet")
+}
diff --git a/internal/cli/workers/render.go b/internal/cli/workers/render.go
new file mode 100644
index 00000000..0bdf09b6
--- /dev/null
+++ b/internal/cli/workers/render.go
@@ -0,0 +1,135 @@
+package workers
+
+import (
+ "io"
+ "strconv"
+ "time"
+
+ "github.com/dustin/go-humanize"
+ "github.com/fatih/color"
+ "github.com/olekukonko/tablewriter"
+ "github.com/roadrunner-server/api/v2/plugins/jobs"
+ "github.com/spiral/roadrunner/v2/state/process"
+)
+
+const (
+ Ready string = "READY"
+ Paused string = "PAUSED/STOPPED"
+)
+
+// WorkerTable renders table with information about rr server workers.
+func WorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetHeader([]string{"PID", "Status", "Execs", "Memory", "CPU%", "Created"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 9)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 7)
+ tw.SetColMinWidth(4, 7)
+ tw.SetColMinWidth(5, 18)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ renderStatus(workers[i].Status),
+ renderJobs(workers[i].NumJobs),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ renderAlive(time.Unix(0, workers[i].Created)),
+ })
+ }
+
+ return tw
+}
+
+// ServiceWorkerTable renders table with information about rr server workers.
+func ServiceWorkerTable(writer io.Writer, workers []*process.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetAutoWrapText(false)
+ tw.SetHeader([]string{"PID", "Memory", "CPU%", "Command"})
+ tw.SetColMinWidth(0, 7)
+ tw.SetColMinWidth(1, 7)
+ tw.SetColMinWidth(2, 7)
+ tw.SetColMinWidth(3, 18)
+ tw.SetAlignment(tablewriter.ALIGN_LEFT)
+
+ for i := 0; i < len(workers); i++ {
+ tw.Append([]string{
+ strconv.Itoa(workers[i].Pid),
+ humanize.Bytes(workers[i].MemoryUsage),
+ renderCPU(workers[i].CPUPercent),
+ workers[i].Command,
+ })
+ }
+
+ return tw
+}
+
+// JobsTable renders table with information about rr server jobs.
+func JobsTable(writer io.Writer, jobs []*jobs.State) *tablewriter.Table {
+ tw := tablewriter.NewWriter(writer)
+ tw.SetAutoWrapText(false)
+ tw.SetHeader([]string{"Status", "Pipeline", "Driver", "Queue", "Active", "Delayed", "Reserved"})
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetColWidth(7)
+ tw.SetColWidth(15)
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetColWidth(10)
+ tw.SetAlignment(tablewriter.ALIGN_LEFT)
+
+ for i := 0; i < len(jobs); i++ {
+ tw.Append([]string{
+ renderReady(jobs[i].Ready),
+ jobs[i].Pipeline,
+ jobs[i].Driver,
+ jobs[i].Queue,
+ strconv.Itoa(int(jobs[i].Active)),
+ strconv.Itoa(int(jobs[i].Delayed)),
+ strconv.Itoa(int(jobs[i].Reserved)),
+ })
+ }
+
+ return tw
+}
+
+func renderReady(ready bool) string {
+ if ready {
+ return Ready
+ }
+
+ return Paused
+}
+
+//go:inline
+func renderCPU(cpu float64) string {
+ return strconv.FormatFloat(cpu, 'f', 2, 64)
+}
+
+func renderStatus(status string) string {
+ switch status {
+ case "inactive":
+ return color.YellowString("inactive")
+ case "ready":
+ return color.CyanString("ready")
+ case "working":
+ return color.GreenString("working")
+ case "invalid":
+ return color.YellowString("invalid")
+ case "stopped":
+ return color.RedString("stopped")
+ case "errored":
+ return color.RedString("errored")
+ default:
+ return status
+ }
+}
+
+func renderJobs(number uint64) string {
+ return humanize.Comma(int64(number))
+}
+
+func renderAlive(t time.Time) string {
+ return humanize.RelTime(t, time.Now(), "ago", "")
+}
diff --git a/internal/container/config.go b/internal/container/config.go
new file mode 100644
index 00000000..54e2bb5b
--- /dev/null
+++ b/internal/container/config.go
@@ -0,0 +1,83 @@
+package container
+
+import (
+ "fmt"
+ "time"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+)
+
+type Config struct {
+ GracePeriod time.Duration
+ PrintGraph bool
+ RetryOnFail bool // TODO check for races, disabled at this moment
+ LogLevel endure.Level
+}
+
+const (
+ endureKey = "endure"
+ defaultGracePeriod = time.Second * 30
+)
+
+// NewConfig creates endure container configuration.
+func NewConfig(cfgPlugin *config.Plugin) (*Config, error) {
+ if !cfgPlugin.Has(endureKey) {
+ return &Config{ // return config with defaults
+ GracePeriod: defaultGracePeriod,
+ PrintGraph: false,
+ RetryOnFail: false,
+ LogLevel: endure.ErrorLevel,
+ }, nil
+ }
+
+ rrCfgEndure := struct {
+ GracePeriod time.Duration `mapstructure:"grace_period"`
+ PrintGraph bool `mapstructure:"print_graph"`
+ RetryOnFail bool `mapstructure:"retry_on_fail"`
+ LogLevel string `mapstructure:"log_level"`
+ }{}
+
+ if err := cfgPlugin.UnmarshalKey(endureKey, &rrCfgEndure); err != nil {
+ return nil, err
+ }
+
+ if rrCfgEndure.GracePeriod == 0 {
+ rrCfgEndure.GracePeriod = defaultGracePeriod
+ }
+
+ if rrCfgEndure.LogLevel == "" {
+ rrCfgEndure.LogLevel = "error"
+ }
+
+ logLevel, err := parseLogLevel(rrCfgEndure.LogLevel)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Config{
+ GracePeriod: rrCfgEndure.GracePeriod,
+ PrintGraph: rrCfgEndure.PrintGraph,
+ RetryOnFail: rrCfgEndure.RetryOnFail,
+ LogLevel: logLevel,
+ }, nil
+}
+
+func parseLogLevel(s string) (endure.Level, error) {
+ switch s {
+ case "debug":
+ return endure.DebugLevel, nil
+ case "info":
+ return endure.InfoLevel, nil
+ case "warn", "warning":
+ return endure.WarnLevel, nil
+ case "error":
+ return endure.ErrorLevel, nil
+ case "panic":
+ return endure.PanicLevel, nil
+ case "fatal":
+ return endure.FatalLevel, nil
+ }
+
+ return endure.DebugLevel, fmt.Errorf(`unknown log level "%s" (allowed: debug, info, warn, error, panic, fatal)`, s)
+}
diff --git a/internal/container/config_test.go b/internal/container/config_test.go
new file mode 100644
index 00000000..9919def4
--- /dev/null
+++ b/internal/container/config_test.go
@@ -0,0 +1,82 @@
+package container_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/container"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewConfig_SuccessfulReading(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte(`
+endure:
+ grace_period: 10s
+ print_graph: true
+ retry_on_fail: true
+ log_level: warn
+`)}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := container.NewConfig(cfgPlugin)
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+
+ assert.Equal(t, time.Second*10, c.GracePeriod)
+ assert.True(t, c.PrintGraph)
+ assert.True(t, c.RetryOnFail)
+ assert.Equal(t, endure.WarnLevel, c.LogLevel)
+}
+
+func TestNewConfig_WithoutEndureKey(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := container.NewConfig(cfgPlugin)
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+
+ assert.Equal(t, time.Second*30, c.GracePeriod)
+ assert.False(t, c.PrintGraph)
+ assert.False(t, c.RetryOnFail)
+ assert.Equal(t, endure.ErrorLevel, c.LogLevel)
+}
+
+func TestNewConfig_LoggingLevels(t *testing.T) {
+ for _, tt := range []struct {
+ giveLevel string
+ wantLevel endure.Level
+ wantError bool
+ }{
+ {giveLevel: "debug", wantLevel: endure.DebugLevel},
+ {giveLevel: "info", wantLevel: endure.InfoLevel},
+ {giveLevel: "warn", wantLevel: endure.WarnLevel},
+ {giveLevel: "warning", wantLevel: endure.WarnLevel},
+ {giveLevel: "error", wantLevel: endure.ErrorLevel},
+ {giveLevel: "panic", wantLevel: endure.PanicLevel},
+ {giveLevel: "fatal", wantLevel: endure.FatalLevel},
+
+ {giveLevel: "foobar", wantError: true},
+ } {
+ tt := tt
+ t.Run(tt.giveLevel, func(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("endure:\n log_level: " + tt.giveLevel)}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := container.NewConfig(cfgPlugin)
+
+ if tt.wantError {
+ assert.Nil(t, c)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "unknown log level")
+ } else {
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+ assert.Equal(t, tt.wantLevel, c.LogLevel)
+ }
+ })
+ }
+}
diff --git a/internal/container/container.go b/internal/container/container.go
new file mode 100644
index 00000000..aa767b2e
--- /dev/null
+++ b/internal/container/container.go
@@ -0,0 +1,21 @@
+package container
+
+import (
+ endure "github.com/spiral/endure/pkg/container"
+)
+
+// NewContainer creates endure container with all required options (based on container Config). Logger is nil by
+// default.
+func NewContainer(cfg Config) (*endure.Endure, error) {
+ endureOptions := []endure.Options{
+ endure.SetLogLevel(cfg.LogLevel),
+ endure.RetryOnFail(cfg.RetryOnFail),
+ endure.GracefulShutdownTimeout(cfg.GracePeriod),
+ }
+
+ if cfg.PrintGraph {
+ endureOptions = append(endureOptions, endure.Visualize(endure.StdOut, ""))
+ }
+
+ return endure.NewContainer(nil, endureOptions...)
+}
diff --git a/internal/container/container_test.go b/internal/container/container_test.go
new file mode 100644
index 00000000..c6d613a0
--- /dev/null
+++ b/internal/container/container_test.go
@@ -0,0 +1,27 @@
+package container_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/container"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewContainer(t *testing.T) { // there is no legal way to test container options
+ c, err := container.NewContainer(container.Config{})
+ c2, err2 := container.NewContainer(container.Config{
+ GracePeriod: time.Second,
+ PrintGraph: true,
+ RetryOnFail: true,
+ LogLevel: endure.WarnLevel,
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, c)
+
+ assert.NoError(t, err2)
+ assert.NotNil(t, c2)
+}
diff --git a/internal/container/plugins.go b/internal/container/plugins.go
new file mode 100644
index 00000000..6c962793
--- /dev/null
+++ b/internal/container/plugins.go
@@ -0,0 +1,104 @@
+package container
+
+import (
+ "github.com/spiral/roadrunner-plugins/v2/amqp"
+ "github.com/spiral/roadrunner-plugins/v2/beanstalk"
+ "github.com/spiral/roadrunner-plugins/v2/boltdb"
+ "github.com/spiral/roadrunner-plugins/v2/broadcast"
+ "github.com/spiral/roadrunner-plugins/v2/fileserver"
+ grpcPlugin "github.com/spiral/roadrunner-plugins/v2/grpc"
+ httpPlugin "github.com/spiral/roadrunner-plugins/v2/http"
+ "github.com/spiral/roadrunner-plugins/v2/http/middleware/gzip"
+ "github.com/spiral/roadrunner-plugins/v2/http/middleware/headers"
+ newrelic "github.com/spiral/roadrunner-plugins/v2/http/middleware/new_relic"
+ "github.com/spiral/roadrunner-plugins/v2/http/middleware/prometheus"
+ "github.com/spiral/roadrunner-plugins/v2/http/middleware/static"
+ "github.com/spiral/roadrunner-plugins/v2/http/middleware/websockets"
+ "github.com/spiral/roadrunner-plugins/v2/informer"
+ "github.com/spiral/roadrunner-plugins/v2/jobs"
+ "github.com/spiral/roadrunner-plugins/v2/kv"
+ "github.com/spiral/roadrunner-plugins/v2/logger"
+ "github.com/spiral/roadrunner-plugins/v2/memcached"
+ "github.com/spiral/roadrunner-plugins/v2/memory"
+ "github.com/spiral/roadrunner-plugins/v2/metrics"
+ "github.com/spiral/roadrunner-plugins/v2/nats"
+ "github.com/spiral/roadrunner-plugins/v2/redis"
+ "github.com/spiral/roadrunner-plugins/v2/reload"
+ "github.com/spiral/roadrunner-plugins/v2/resetter"
+ rpcPlugin "github.com/spiral/roadrunner-plugins/v2/rpc"
+ "github.com/spiral/roadrunner-plugins/v2/server"
+ "github.com/spiral/roadrunner-plugins/v2/service"
+ "github.com/spiral/roadrunner-plugins/v2/sqs"
+ "github.com/spiral/roadrunner-plugins/v2/status"
+ "github.com/spiral/roadrunner-plugins/v2/tcp"
+ roadrunner_temporal "github.com/temporalio/roadrunner-temporal"
+)
+
+// Plugins returns active plugins for the endure container. Feel free to add or remove any plugins.
+func Plugins() []interface{} { //nolint:funlen
+ return []interface{}{
+ // bundled
+ // informer plugin (./rr workers, ./rr workers -i)
+ &informer.Plugin{},
+ // resetter plugin (./rr reset)
+ &resetter.Plugin{},
+
+ // logger plugin
+ &logger.ZapLogger{},
+ // metrics plugin
+ &metrics.Plugin{},
+ // reload plugin
+ &reload.Plugin{},
+ // rpc plugin (workers, reset)
+ &rpcPlugin.Plugin{},
+ // server plugin (NewWorker, NewWorkerPool)
+ &server.Plugin{},
+ // service plugin
+ &service.Plugin{},
+
+ // ========= JOBS bundle
+ &jobs.Plugin{},
+ &amqp.Plugin{},
+ &sqs.Plugin{},
+ &nats.Plugin{},
+ &beanstalk.Plugin{},
+ // =========
+
+ // http server plugin with middleware
+ &httpPlugin.Plugin{},
+ &newrelic.Plugin{},
+ &static.Plugin{},
+ &headers.Plugin{},
+ &status.Plugin{},
+ &gzip.Plugin{},
+ &prometheus.Plugin{},
+
+ &fileserver.Plugin{},
+ // ===================
+
+ &grpcPlugin.Plugin{},
+ // kv + ws + jobs plugin
+ &memory.Plugin{},
+ // KV + Jobs
+ &boltdb.Plugin{},
+
+ // broadcast via memory or redis
+ // used in conjunction with Websockets, memory and redis plugins
+ &broadcast.Plugin{},
+ // ======== websockets broadcast bundle
+ &websockets.Plugin{},
+ &redis.Plugin{},
+ // =========
+
+ // ============== KV
+ &kv.Plugin{},
+ &memcached.Plugin{},
+ // ==============
+
+ // raw TCP connections handling
+ &tcp.Plugin{},
+
+ // temporal plugins
+ &roadrunner_temporal.Plugin{},
+ }
+}
diff --git a/internal/container/plugins_test.go b/internal/container/plugins_test.go
new file mode 100644
index 00000000..da639f7d
--- /dev/null
+++ b/internal/container/plugins_test.go
@@ -0,0 +1,20 @@
+package container_test
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/container"
+)
+
+func TestPlugins(t *testing.T) {
+ for _, p := range container.Plugins() {
+ if p == nil {
+ t.Error("plugin cannot be nil")
+ }
+
+ if pk := reflect.TypeOf(p).Kind(); pk != reflect.Ptr && pk != reflect.Struct {
+ t.Errorf("plugin %v must be a structure or pointer to the structure", p)
+ }
+ }
+}
diff --git a/internal/debug/server.go b/internal/debug/server.go
new file mode 100644
index 00000000..c07a4549
--- /dev/null
+++ b/internal/debug/server.go
@@ -0,0 +1,37 @@
+package debug
+
+import (
+ "context"
+ "net/http"
+ "net/http/pprof"
+)
+
+// Server is a HTTP server for debugging.
+type Server struct {
+ srv *http.Server
+}
+
+// NewServer creates new HTTP server for debugging.
+func NewServer() Server {
+ mux := http.NewServeMux()
+
+ mux.HandleFunc("/debug/pprof/", pprof.Index)
+ mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
+ mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
+ mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
+ mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+
+ return Server{srv: &http.Server{Handler: mux}}
+}
+
+// Start debug server.
+func (s *Server) Start(addr string) error {
+ s.srv.Addr = addr
+
+ return s.srv.ListenAndServe()
+}
+
+// Stop debug server.
+func (s *Server) Stop(ctx context.Context) error {
+ return s.srv.Shutdown(ctx)
+}
diff --git a/internal/debug/server_test.go b/internal/debug/server_test.go
new file mode 100644
index 00000000..d2e1f9f0
--- /dev/null
+++ b/internal/debug/server_test.go
@@ -0,0 +1,57 @@
+package debug_test
+
+import (
+ "context"
+ "math/rand"
+ "net"
+ "net/http"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/debug"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestServer_StartingAndStopping(t *testing.T) {
+ rand.Seed(time.Now().UnixNano())
+
+ var (
+ s = debug.NewServer()
+ port = strconv.Itoa(rand.Intn(10000) + 10000) //nolint:gosec
+ )
+
+ go func() { assert.ErrorIs(t, s.Start(":"+port), http.ErrServerClosed) }()
+
+ defer func() { assert.NoError(t, s.Stop(context.Background())) }()
+
+ for i := 0; i < 100; i++ { // wait for server started state
+ if l, err := net.Dial("tcp", ":"+port); err != nil {
+ <-time.After(time.Millisecond)
+ } else {
+ _ = l.Close()
+
+ break
+ }
+ }
+
+ for _, uri := range []string{ // assert that pprof handlers exists
+ "http://127.0.0.1:" + port + "/debug/pprof/",
+ "http://127.0.0.1:" + port + "/debug/pprof/cmdline",
+ // "http://127.0.0.1:" + port + "/debug/pprof/profile",
+ "http://127.0.0.1:" + port + "/debug/pprof/symbol",
+ // "http://127.0.0.1:" + port + "/debug/pprof/trace",
+ } {
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+
+ req, _ := http.NewRequestWithContext(ctx, http.MethodHead, uri, http.NoBody)
+ resp, err := http.DefaultClient.Do(req)
+ assert.NoError(t, err)
+ assert.Equal(t, http.StatusOK, resp.StatusCode)
+
+ _ = resp.Body.Close()
+
+ cancel()
+ }
+}
diff --git a/internal/meta/meta.go b/internal/meta/meta.go
new file mode 100644
index 00000000..0c5a0556
--- /dev/null
+++ b/internal/meta/meta.go
@@ -0,0 +1,23 @@
+package meta
+
+import "strings"
+
+// next variables will be set during compilation (do NOT rename them).
+var (
+ version = "local"
+ buildTime = "development" //nolint:gochecknoglobals
+)
+
+// Version returns version value (without `v` prefix).
+func Version() string {
+ v := strings.TrimSpace(version)
+
+ if len(v) > 1 && ((v[0] == 'v' || v[0] == 'V') && (v[1] >= '0' && v[1] <= '9')) {
+ return v[1:]
+ }
+
+ return v
+}
+
+// BuildTime returns application building time.
+func BuildTime() string { return buildTime }
diff --git a/internal/meta/meta_test.go b/internal/meta/meta_test.go
new file mode 100644
index 00000000..32dee122
--- /dev/null
+++ b/internal/meta/meta_test.go
@@ -0,0 +1,49 @@
+package meta
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestVersion(t *testing.T) {
+ for give, want := range map[string]string{
+ // without changes
+ "vvv": "vvv",
+ "victory": "victory",
+ "voodoo": "voodoo",
+ "foo": "foo",
+ "0.0.0": "0.0.0",
+ "v": "v",
+ "V": "V",
+
+ // "v" prefix removal
+ "v0.0.0": "0.0.0",
+ "V0.0.0": "0.0.0",
+ "v1": "1",
+ "V1": "1",
+
+ // with spaces
+ " 0.0.0": "0.0.0",
+ "v0.0.0 ": "0.0.0",
+ " V0.0.0": "0.0.0",
+ "v1 ": "1",
+ " V1": "1",
+ "v ": "v",
+ } {
+ version = give
+
+ assert.Equal(t, want, Version())
+ }
+}
+
+func TestBuildTime(t *testing.T) {
+ for give, want := range map[string]string{
+ "development": "development",
+ "2021-03-26T13:50:31+0500": "2021-03-26T13:50:31+0500",
+ } {
+ buildTime = give
+
+ assert.Equal(t, want, BuildTime())
+ }
+}
diff --git a/internal/protocol.go b/internal/protocol.go
deleted file mode 100755
index cefd685d..00000000
--- a/internal/protocol.go
+++ /dev/null
@@ -1,111 +0,0 @@
-package internal
-
-import (
- "os"
- "sync"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/goridge/v3/pkg/frame"
- "github.com/spiral/goridge/v3/pkg/relay"
-)
-
-type StopCommand struct {
- Stop bool `json:"stop"`
-}
-
-type pidCommand struct {
- Pid int `json:"pid"`
-}
-
-var fPool = sync.Pool{New: func() interface{} {
- return frame.NewFrame()
-}}
-
-func getFrame() *frame.Frame {
- return fPool.Get().(*frame.Frame)
-}
-
-func putFrame(f *frame.Frame) {
- f.Reset()
- fPool.Put(f)
-}
-
-func SendControl(rl relay.Relay, payload interface{}) error {
- fr := getFrame()
- defer putFrame(fr)
-
- fr.WriteVersion(fr.Header(), frame.VERSION_1)
- fr.WriteFlags(fr.Header(), frame.CONTROL, frame.CODEC_JSON)
-
- if data, ok := payload.([]byte); ok {
- // check if payload no more that 4Gb
- if uint32(len(data)) > ^uint32(0) {
- return errors.Str("payload is more that 4gb")
- }
-
- fr.WritePayloadLen(fr.Header(), uint32(len(data)))
- fr.WritePayload(data)
- fr.WriteCRC(fr.Header())
-
- err := rl.Send(fr)
- if err != nil {
- return err
- }
- return nil
- }
-
- data, err := json.Marshal(payload)
- if err != nil {
- return errors.Errorf("invalid payload: %s", err)
- }
-
- fr.WritePayloadLen(fr.Header(), uint32(len(data)))
- fr.WritePayload(data)
- fr.WriteCRC(fr.Header())
-
- // we don't need a copy here, because frame copy the data before send
- err = rl.Send(fr)
- if err != nil {
- return errors.E(errors.FileNotFound, err)
- }
-
- return nil
-}
-
-func Pid(rl relay.Relay) (int64, error) {
- err := SendControl(rl, pidCommand{Pid: os.Getpid()})
- if err != nil {
- return 0, err
- }
-
- fr := getFrame()
- defer putFrame(fr)
-
- err = rl.Receive(fr)
- if err != nil {
- return 0, err
- }
-
- if fr == nil {
- return 0, errors.Str("nil frame received")
- }
-
- flags := fr.ReadFlags()
-
- if flags&frame.CONTROL == 0 {
- return 0, errors.Str("unexpected response, header is missing, no CONTROL flag")
- }
-
- link := &pidCommand{}
- err = json.Unmarshal(fr.Payload(), link)
- if err != nil {
- return 0, err
- }
-
- if link.Pid <= 0 {
- return 0, errors.Str("pid should be greater than 0")
- }
-
- return int64(link.Pid), nil
-}
diff --git a/internal/rpc/client.go b/internal/rpc/client.go
new file mode 100644
index 00000000..f371a51c
--- /dev/null
+++ b/internal/rpc/client.go
@@ -0,0 +1,33 @@
+// Package prc contains wrapper around RPC client ONLY for internal usage.
+package rpc
+
+import (
+ "net/rpc"
+
+ "github.com/spiral/errors"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ rpcPlugin "github.com/spiral/roadrunner-plugins/v2/rpc"
+)
+
+// NewClient creates client ONLY for internal usage (communication between our application with RR side).
+// Client will be connected to the RPC.
+func NewClient(cfgPlugin *config.Plugin) (*rpc.Client, error) {
+ if !cfgPlugin.Has(rpcPlugin.PluginName) {
+ return nil, errors.E("rpc service disabled")
+ }
+
+ rpcConfig := &rpcPlugin.Config{}
+ if err := cfgPlugin.UnmarshalKey(rpcPlugin.PluginName, rpcConfig); err != nil {
+ return nil, err
+ }
+
+ rpcConfig.InitDefaults()
+
+ conn, err := rpcConfig.Dialer()
+ if err != nil {
+ return nil, err
+ }
+
+ return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
+}
diff --git a/internal/rpc/client_test.go b/internal/rpc/client_test.go
new file mode 100644
index 00000000..b39788a2
--- /dev/null
+++ b/internal/rpc/client_test.go
@@ -0,0 +1,60 @@
+package rpc_test
+
+import (
+ "net"
+ "testing"
+
+ "github.com/spiral/roadrunner-binary/v2/internal/rpc"
+
+ "github.com/spiral/roadrunner-plugins/v2/config"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestNewClient_RpcServiceDisabled(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := rpc.NewClient(cfgPlugin)
+
+ assert.Nil(t, c)
+ assert.EqualError(t, err, "rpc service disabled")
+}
+
+func TestNewClient_WrongRcpConfiguration(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n $foo bar")}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := rpc.NewClient(cfgPlugin)
+
+ assert.Nil(t, c)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "config_plugin_unmarshal_key")
+}
+
+func TestNewClient_ConnectionError(t *testing.T) {
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n listen: tcp://127.0.0.1:0")}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := rpc.NewClient(cfgPlugin)
+
+ assert.Nil(t, c)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "connection refused")
+}
+
+func TestNewClient_SuccessfullyConnected(t *testing.T) {
+ l, err := net.Listen("tcp", "127.0.0.1:0")
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, l.Close()) }()
+
+ cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte("rpc:\n listen: tcp://" + l.Addr().String())}
+ assert.NoError(t, cfgPlugin.Init())
+
+ c, err := rpc.NewClient(cfgPlugin)
+
+ assert.NotNil(t, c)
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, c.Close()) }()
+}