summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2022-03-09 09:09:46 +0100
committerValery Piashchynski <[email protected]>2022-03-09 09:09:46 +0100
commitac021525b68512f2d855ec7a870a0d2086a5f318 (patch)
tree30dd607d2f8c88b34871248eeb421c0a0c1fc69b
parent1c5a6a590832bbefb2cab99a81f413a5a0e756de (diff)
viper auto envs and ${} syntax support for the internal RPC client
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--go.mod2
-rw-r--r--internal/cli/reset/command.go4
-rw-r--r--internal/cli/reset/command_test.go2
-rw-r--r--internal/cli/root.go4
-rw-r--r--internal/cli/workers/command.go4
-rw-r--r--internal/cli/workers/command_test.go4
-rw-r--r--internal/rpc/client.go87
-rw-r--r--internal/rpc/client_test.go59
-rw-r--r--internal/rpc/test/config_rpc_ok_env.yaml2
9 files changed, 142 insertions, 26 deletions
diff --git a/go.mod b/go.mod
index 7fe22c99..82c9a3c8 100644
--- a/go.mod
+++ b/go.mod
@@ -47,6 +47,7 @@ require (
github.com/roadrunner-server/tcp/v2 v2.9.2
github.com/roadrunner-server/websockets/v2 v2.9.2
github.com/spf13/cobra v1.3.0
+ github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/temporalio/roadrunner-temporal v1.3.1
github.com/vbauerster/mpb/v5 v5.4.0
@@ -130,7 +131,6 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
- github.com/spf13/viper v1.10.1 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
diff --git a/internal/cli/reset/command.go b/internal/cli/reset/command.go
index a0634c31..ce29d248 100644
--- a/internal/cli/reset/command.go
+++ b/internal/cli/reset/command.go
@@ -17,7 +17,7 @@ import (
var spinnerStyle = []string{"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"} //nolint:gochecknoglobals
// NewCommand creates `reset` command.
-func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen
+func NewCommand(cfgFile *string, override *[]string) *cobra.Command { //nolint:funlen
return &cobra.Command{
Use: "reset",
Short: "Reset workers of all or specific RoadRunner service",
@@ -32,7 +32,7 @@ func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen
return errors.E(op, errors.Str("no configuration file provided"))
}
- client, err := internalRpc.NewClient(*cfgFile)
+ client, err := internalRpc.NewClient(*cfgFile, *override)
if err != nil {
return err
}
diff --git a/internal/cli/reset/command_test.go b/internal/cli/reset/command_test.go
index ca68cb54..57801453 100644
--- a/internal/cli/reset/command_test.go
+++ b/internal/cli/reset/command_test.go
@@ -10,7 +10,7 @@ import (
func TestCommandProperties(t *testing.T) {
path := ""
- cmd := reset.NewCommand(&path)
+ cmd := reset.NewCommand(&path, nil)
assert.Equal(t, "reset", cmd.Use)
assert.NotNil(t, cmd.RunE)
diff --git a/internal/cli/root.go b/internal/cli/root.go
index 0f35d416..07fc891c 100644
--- a/internal/cli/root.go
+++ b/internal/cli/root.go
@@ -91,8 +91,8 @@ func NewCommand(cmdName string) *cobra.Command { //nolint:funlen
f.StringArrayVarP(override, "override", "o", nil, "override config value (dot.notation=value)")
cmd.AddCommand(
- workers.NewCommand(cfgFile),
- reset.NewCommand(cfgFile),
+ workers.NewCommand(cfgFile, override),
+ reset.NewCommand(cfgFile, override),
serve.NewCommand(override, cfgFile, silent),
)
diff --git a/internal/cli/workers/command.go b/internal/cli/workers/command.go
index f498b7af..78d8d1d2 100644
--- a/internal/cli/workers/command.go
+++ b/internal/cli/workers/command.go
@@ -19,7 +19,7 @@ import (
)
// NewCommand creates `workers` command.
-func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen
+func NewCommand(cfgFile *string, override *[]string) *cobra.Command { //nolint:funlen
var (
// interactive workers updates
interactive bool
@@ -38,7 +38,7 @@ func NewCommand(cfgFile *string) *cobra.Command { //nolint:funlen
return errors.E(op, errors.Str("no configuration file provided"))
}
- client, err := internalRpc.NewClient(*cfgFile)
+ client, err := internalRpc.NewClient(*cfgFile, *override)
if err != nil {
return err
}
diff --git a/internal/cli/workers/command_test.go b/internal/cli/workers/command_test.go
index a3d144df..c68ec9b1 100644
--- a/internal/cli/workers/command_test.go
+++ b/internal/cli/workers/command_test.go
@@ -9,14 +9,14 @@ import (
)
func TestCommandProperties(t *testing.T) {
- cmd := workers.NewCommand(nil)
+ cmd := workers.NewCommand(nil, nil)
assert.Equal(t, "workers", cmd.Use)
assert.NotNil(t, cmd.RunE)
}
func TestCommandFlags(t *testing.T) {
- cmd := workers.NewCommand(nil)
+ cmd := workers.NewCommand(nil, nil)
cases := []struct {
giveName string
diff --git a/internal/rpc/client.go b/internal/rpc/client.go
index 7d945add..1e0fbbac 100644
--- a/internal/rpc/client.go
+++ b/internal/rpc/client.go
@@ -1,19 +1,32 @@
// Package rpc contains wrapper around RPC client ONLY for internal usage.
+// Should be in sync with the RPC plugin
package rpc
import (
+ "errors"
+ "fmt"
+ "net"
"net/rpc"
+ "os"
+ "strings"
- "github.com/roadrunner-server/errors"
goridgeRpc "github.com/roadrunner-server/goridge/v3/pkg/rpc"
rpcPlugin "github.com/roadrunner-server/rpc/v2"
"github.com/spf13/viper"
)
+const (
+ prefix string = "rr"
+ rpcKey string = "rpc.listen"
+)
+
// NewClient creates client ONLY for internal usage (communication between our application with RR side).
// Client will be connected to the RPC.
-func NewClient(cfg string) (*rpc.Client, error) {
+func NewClient(cfg string, flags []string) (*rpc.Client, error) {
v := viper.New()
+ v.AutomaticEnv()
+ v.SetEnvPrefix(prefix)
+ v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.SetConfigFile(cfg)
err := v.ReadInConfig()
@@ -21,23 +34,77 @@ func NewClient(cfg string) (*rpc.Client, error) {
return nil, err
}
- if !v.IsSet(rpcPlugin.PluginName) {
- return nil, errors.E("rpc service disabled")
+ // automatically inject ENV variables using ${ENV} pattern
+ for _, key := range v.AllKeys() {
+ val := v.Get(key)
+ if s, ok := val.(string); ok {
+ v.Set(key, os.ExpandEnv(s))
+ }
}
- rpcConfig := &rpcPlugin.Config{}
+ // override config Flags
+ if len(flags) > 0 {
+ for _, f := range flags {
+ key, val, errP := parseFlag(f)
+ if errP != nil {
+ return nil, errP
+ }
- err = v.UnmarshalKey(rpcPlugin.PluginName, rpcConfig)
- if err != nil {
- return nil, err
+ v.Set(key, val)
+ }
}
- rpcConfig.InitDefaults()
+ // rpc.listen might be set by the -o flags or env variable
+ if !v.IsSet(rpcPlugin.PluginName) {
+ return nil, errors.New("rpc service not specified in the configuration. Tip: add\n rpc:\n\r listen: rr_rpc_address")
+ }
- conn, err := rpcConfig.Dialer()
+ conn, err := Dialer(v.GetString(rpcKey))
if err != nil {
return nil, err
}
return rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)), nil
}
+
+// Dialer creates rpc socket Dialer.
+func Dialer(addr string) (net.Conn, error) {
+ dsn := strings.Split(addr, "://")
+ if len(dsn) != 2 {
+ return nil, errors.New("invalid socket DSN (tcp://:6001, unix://file.sock)")
+ }
+
+ return net.Dial(dsn[0], dsn[1])
+}
+
+func parseFlag(flag string) (string, string, error) {
+ if !strings.Contains(flag, "=") {
+ return "", "", fmt.Errorf("invalid flag `%s`", flag)
+ }
+
+ parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2)
+ if len(parts) < 2 {
+ return "", "", errors.New("usage: -o key=value")
+ }
+
+ if parts[0] == "" {
+ return "", "", errors.New("key should not be empty")
+ }
+
+ if parts[1] == "" {
+ return "", "", errors.New("value should not be empty")
+ }
+
+ return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil
+}
+
+func parseValue(value string) string {
+ escape := []rune(value)[0]
+
+ if escape == '"' || escape == '\'' || escape == '`' {
+ value = strings.Trim(value, string(escape))
+ value = strings.ReplaceAll(value, fmt.Sprintf("\\%s", string(escape)), string(escape))
+ }
+
+ return value
+}
diff --git a/internal/rpc/client_test.go b/internal/rpc/client_test.go
index 0744e167..42b2d87e 100644
--- a/internal/rpc/client_test.go
+++ b/internal/rpc/client_test.go
@@ -2,9 +2,11 @@ package rpc_test
import (
"net"
+ "os"
"testing"
"github.com/roadrunner-server/roadrunner/v2/internal/rpc"
+ "github.com/stretchr/testify/require"
"github.com/roadrunner-server/config/v2"
"github.com/stretchr/testify/assert"
@@ -14,22 +16,22 @@ func TestNewClient_RpcServiceDisabled(t *testing.T) {
cfgPlugin := &config.Plugin{Type: "yaml", ReadInCfg: []byte{}}
assert.NoError(t, cfgPlugin.Init())
- c, err := rpc.NewClient("test/config_rpc_empty.yaml")
+ c, err := rpc.NewClient("test/config_rpc_empty.yaml", nil)
assert.Nil(t, c)
- assert.EqualError(t, err, "rpc service disabled")
+ assert.EqualError(t, err, "rpc service not specified")
}
func TestNewClient_WrongRcpConfiguration(t *testing.T) {
- c, err := rpc.NewClient("test/config_rpc_wrong.yaml")
+ c, err := rpc.NewClient("test/config_rpc_wrong.yaml", nil)
assert.Nil(t, c)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "'' expected a map, got 'string'")
+ assert.Contains(t, err.Error(), "invalid socket DSN")
}
func TestNewClient_ConnectionError(t *testing.T) {
- c, err := rpc.NewClient("test/config_rpc_conn_err.yaml")
+ c, err := rpc.NewClient("test/config_rpc_conn_err.yaml", nil)
assert.Nil(t, c)
assert.Error(t, err)
@@ -42,7 +44,52 @@ func TestNewClient_SuccessfullyConnected(t *testing.T) {
defer func() { assert.NoError(t, l.Close()) }()
- c, err := rpc.NewClient("test/config_rpc_ok.yaml")
+ c, err := rpc.NewClient("test/config_rpc_ok.yaml", nil)
+
+ assert.NotNil(t, c)
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, c.Close()) }()
+}
+
+func TestNewClient_SuccessfullyConnectedOverride(t *testing.T) {
+ l, err := net.Listen("tcp", "127.0.0.1:55555")
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, l.Close()) }()
+
+ c, err := rpc.NewClient("test/config_rpc_empty.yaml", []string{"rpc.listen=tcp://127.0.0.1:55555"})
+
+ assert.NotNil(t, c)
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, c.Close()) }()
+}
+
+func TestNewClient_SuccessfullyConnectedEnv(t *testing.T) {
+ l, err := net.Listen("tcp", "127.0.0.1:55556")
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, l.Close()) }()
+
+ require.NoError(t, os.Setenv("RR_RPC_LISTEN", "tcp://127.0.0.1:55556"))
+ c, err := rpc.NewClient("test/config_rpc_ok.yaml", nil)
+
+ assert.NotNil(t, c)
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, c.Close()) }()
+}
+
+// ${} syntax
+func TestNewClient_SuccessfullyConnectedEnvDollarSyntax(t *testing.T) {
+ l, err := net.Listen("tcp", "127.0.0.1:55556")
+ assert.NoError(t, err)
+
+ defer func() { assert.NoError(t, l.Close()) }()
+
+ require.NoError(t, os.Setenv("RPC", "tcp://127.0.0.1:55556"))
+ c, err := rpc.NewClient("test/config_rpc_ok_env.yaml", nil)
assert.NotNil(t, c)
assert.NoError(t, err)
diff --git a/internal/rpc/test/config_rpc_ok_env.yaml b/internal/rpc/test/config_rpc_ok_env.yaml
new file mode 100644
index 00000000..fd0d3f11
--- /dev/null
+++ b/internal/rpc/test/config_rpc_ok_env.yaml
@@ -0,0 +1,2 @@
+rpc:
+ listen: ${RPC}