From ac021525b68512f2d855ec7a870a0d2086a5f318 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Wed, 9 Mar 2022 09:09:46 +0100 Subject: viper auto envs and ${} syntax support for the internal RPC client Signed-off-by: Valery Piashchynski --- internal/cli/reset/command.go | 4 +- internal/cli/reset/command_test.go | 2 +- internal/cli/root.go | 4 +- internal/cli/workers/command.go | 4 +- internal/cli/workers/command_test.go | 4 +- internal/rpc/client.go | 87 ++++++++++++++++++++++++++++---- internal/rpc/client_test.go | 59 +++++++++++++++++++--- internal/rpc/test/config_rpc_ok_env.yaml | 2 + 8 files changed, 141 insertions(+), 25 deletions(-) create mode 100644 internal/rpc/test/config_rpc_ok_env.yaml (limited to 'internal') diff --git a/internal/cli/reset/command.go b/internal/cli/reset/command.go index a0634c31..ce29d248 100644 --- a/internal/cli/reset/command.go +++ b/internal/cli/reset/command.go @@ -17,7 +17,7 @@ import ( var spinnerStyle = []string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"} //nolint:gochecknoglobals // NewCommand creates `reset` command. -func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen +func NewCommand(cfgFile *string, override *[]string) *cobra.Command { //nolint:funlen return &cobra.Command{ Use: "reset", Short: "Reset workers of all or specific RoadRunner service", @@ -32,7 +32,7 @@ func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen return errors.E(op, errors.Str("no configuration file provided")) } - client, err := internalRpc.NewClient(*cfgFile) + client, err := internalRpc.NewClient(*cfgFile, *override) if err != nil { return err } diff --git a/internal/cli/reset/command_test.go b/internal/cli/reset/command_test.go index ca68cb54..57801453 100644 --- a/internal/cli/reset/command_test.go +++ b/internal/cli/reset/command_test.go @@ -10,7 +10,7 @@ import ( func TestCommandProperties(t *testing.T) { path := "" - cmd := reset.NewCommand(&path) + cmd := reset.NewCommand(&path, nil) assert.Equal(t, "reset", cmd.Use) assert.NotNil(t, cmd.RunE) diff --git a/internal/cli/root.go b/internal/cli/root.go index 0f35d416..07fc891c 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -91,8 +91,8 @@ func NewCommand(cmdName string) *cobra.Command { //nolint:funlen f.StringArrayVarP(override, "override", "o", nil, "override config value (dot.notation=value)") cmd.AddCommand( - workers.NewCommand(cfgFile), - reset.NewCommand(cfgFile), + workers.NewCommand(cfgFile, override), + reset.NewCommand(cfgFile, override), serve.NewCommand(override, cfgFile, silent), ) diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go index f498b7af..78d8d1d2 100644 --- a/internal/cli/workers/command.go +++ b/internal/cli/workers/command.go @@ -19,7 +19,7 @@ import ( ) // NewCommand creates `workers` command. -func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen +func NewCommand(cfgFile *string, override *[]string) *cobra.Command { //nolint:funlen var ( // interactive workers updates interactive bool @@ -38,7 +38,7 @@ func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen return errors.E(op, errors.Str("no configuration file provided")) } - client, err := internalRpc.NewClient(*cfgFile) + client, err := internalRpc.NewClient(*cfgFile, *override) if err != nil { return err } diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go index a3d144df..c68ec9b1 100644 --- a/internal/cli/workers/command_test.go +++ b/internal/cli/workers/command_test.go @@ -9,14 +9,14 @@ import ( ) func TestCommandProperties(t *testing.T) { - cmd := workers.NewCommand(nil) + cmd := workers.NewCommand(nil, nil) assert.Equal(t, "workers", cmd.Use) assert.NotNil(t, cmd.RunE) } func TestCommandFlags(t *testing.T) { - cmd := workers.NewCommand(nil) + cmd := workers.NewCommand(nil, nil) cases := []struct { giveName string diff --git a/internal/rpc/client.go b/internal/rpc/client.go index 7d945add..1e0fbbac 100644 --- a/internal/rpc/client.go +++ b/internal/rpc/client.go @@ -1,19 +1,32 @@ // Package rpc contains wrapper around RPC client ONLY for internal usage. +// Should be in sync with the RPC plugin package rpc import ( + "errors" + "fmt" + "net" "net/rpc" + "os" + "strings" - "github.com/roadrunner-server/errors" goridgeRpc "github.com/roadrunner-server/goridge/v3/pkg/rpc" rpcPlugin "github.com/roadrunner-server/rpc/v2" "github.com/spf13/viper" ) +const ( + prefix string = "rr" + rpcKey string = "rpc.listen" +) + // NewClient creates client ONLY for internal usage (communication between our application with RR side). // Client will be connected to the RPC. -func NewClient(cfg string) (*rpc.Client, error) { +func NewClient(cfg string, flags []string) (*rpc.Client, error) { v := viper.New() + v.AutomaticEnv() + v.SetEnvPrefix(prefix) + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.SetConfigFile(cfg) err := v.ReadInConfig() @@ -21,23 +34,77 @@ func NewClient(cfg string) (*rpc.Client, error) { return nil, err } - if !v.IsSet(rpcPlugin.PluginName) { - return nil, errors.E("rpc service disabled") + // automatically inject ENV variables using ${ENV} pattern + for _, key := range v.AllKeys() { + val := v.Get(key) + if s, ok := val.(string); ok { + v.Set(key, os.ExpandEnv(s)) + } } - rpcConfig := &rpcPlugin.Config{} + // override config Flags + if len(flags) > 0 { + for _, f := range flags { + key, val, errP := parseFlag(f) + if errP != nil { + return nil, errP + } - err = v.UnmarshalKey(rpcPlugin.PluginName, rpcConfig) - if err != nil { - return nil, err + v.Set(key, val) + } } - rpcConfig.InitDefaults() + // rpc.listen might be set by the -o flags or env variable + if !v.IsSet(rpcPlugin.PluginName) { + return nil, errors.New("rpc service not specified in the configuration. Tip: add\n rpc:\n\r listen: rr_rpc_address") + } - conn, err := rpcConfig.Dialer() + conn, err := Dialer(v.GetString(rpcKey)) if err != nil { return nil, err } return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil } + +// Dialer creates rpc socket Dialer. +func Dialer(addr string) (net.Conn, error) { + dsn := strings.Split(addr, "://") + if len(dsn) != 2 { + return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)") + } + + return net.Dial(dsn[0], dsn[1]) +} + +func parseFlag(flag string) (string, string, error) { + if !strings.Contains(flag, "=") { + return "", "", fmt.Errorf("invalid flag `%s`", flag) + } + + parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2) + if len(parts) < 2 { + return "", "", errors.New("usage: -o key=value") + } + + if parts[0] == "" { + return "", "", errors.New("key should not be empty") + } + + if parts[1] == "" { + return "", "", errors.New("value should not be empty") + } + + return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil +} + +func parseValue(value string) string { + escape := []rune(value)[0] + + if escape == '"' || escape == '\'' || escape == '`' { + value = strings.Trim(value, string(escape)) + value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape)) + } + + return value +} diff --git a/internal/rpc/client_test.go b/internal/rpc/client_test.go index 0744e167..42b2d87e 100644 --- a/internal/rpc/client_test.go +++ b/internal/rpc/client_test.go @@ -2,9 +2,11 @@ package rpc_test import ( "net" + "os" "testing" "github.com/roadrunner-server/roadrunner/v2/internal/rpc" + "github.com/stretchr/testify/require" "github.com/roadrunner-server/config/v2" "github.com/stretchr/testify/assert" @@ -14,22 +16,22 @@ func TestNewClient_RpcServiceDisabled(t *testing.T) { cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}} assert.NoError(t, cfgPlugin.Init()) - c, err := rpc.NewClient("test/config_rpc_empty.yaml") + c, err := rpc.NewClient("test/config_rpc_empty.yaml", nil) assert.Nil(t, c) - assert.EqualError(t, err, "rpc service disabled") + assert.EqualError(t, err, "rpc service not specified") } func TestNewClient_WrongRcpConfiguration(t *testing.T) { - c, err := rpc.NewClient("test/config_rpc_wrong.yaml") + c, err := rpc.NewClient("test/config_rpc_wrong.yaml", nil) assert.Nil(t, c) assert.Error(t, err) - assert.Contains(t, err.Error(), "'' expected a map, got 'string'") + assert.Contains(t, err.Error(), "invalid socket DSN") } func TestNewClient_ConnectionError(t *testing.T) { - c, err := rpc.NewClient("test/config_rpc_conn_err.yaml") + c, err := rpc.NewClient("test/config_rpc_conn_err.yaml", nil) assert.Nil(t, c) assert.Error(t, err) @@ -42,7 +44,52 @@ func TestNewClient_SuccessfullyConnected(t *testing.T) { defer func() { assert.NoError(t, l.Close()) }() - c, err := rpc.NewClient("test/config_rpc_ok.yaml") + c, err := rpc.NewClient("test/config_rpc_ok.yaml", nil) + + assert.NotNil(t, c) + assert.NoError(t, err) + + defer func() { assert.NoError(t, c.Close()) }() +} + +func TestNewClient_SuccessfullyConnectedOverride(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:55555") + assert.NoError(t, err) + + defer func() { assert.NoError(t, l.Close()) }() + + c, err := rpc.NewClient("test/config_rpc_empty.yaml", []string{"rpc.listen=tcp://127.0.0.1:55555"}) + + assert.NotNil(t, c) + assert.NoError(t, err) + + defer func() { assert.NoError(t, c.Close()) }() +} + +func TestNewClient_SuccessfullyConnectedEnv(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:55556") + assert.NoError(t, err) + + defer func() { assert.NoError(t, l.Close()) }() + + require.NoError(t, os.Setenv("RR_RPC_LISTEN", "tcp://127.0.0.1:55556")) + c, err := rpc.NewClient("test/config_rpc_ok.yaml", nil) + + assert.NotNil(t, c) + assert.NoError(t, err) + + defer func() { assert.NoError(t, c.Close()) }() +} + +// ${} syntax +func TestNewClient_SuccessfullyConnectedEnvDollarSyntax(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:55556") + assert.NoError(t, err) + + defer func() { assert.NoError(t, l.Close()) }() + + require.NoError(t, os.Setenv("RPC", "tcp://127.0.0.1:55556")) + c, err := rpc.NewClient("test/config_rpc_ok_env.yaml", nil) assert.NotNil(t, c) assert.NoError(t, err) diff --git a/internal/rpc/test/config_rpc_ok_env.yaml b/internal/rpc/test/config_rpc_ok_env.yaml new file mode 100644 index 00000000..fd0d3f11 --- /dev/null +++ b/internal/rpc/test/config_rpc_ok_env.yaml @@ -0,0 +1,2 @@ +rpc: + listen: ${RPC} -- cgit v1.2.3