summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-21 13:14:42 +0300
committerValery Piashchynski <[email protected]>2020-12-21 13:14:42 +0300
commite7a62f19155bbba9ac17527e2abb30d31c632655 (patch)
treef060dbd61f0dc0d1093937eced738bd6aead1373 /plugins
parentb403dd170bf3dc3ce451ba4ada40dd55773b032a (diff)
Finish redis plugin
Diffstat (limited to 'plugins')
-rwxr-xr-xplugins/config/plugin.go29
-rw-r--r--plugins/redis/config.go36
-rw-r--r--plugins/redis/plugin.go63
-rw-r--r--plugins/redis/tests/configs/.rr-redis.yaml25
-rw-r--r--plugins/redis/tests/plugin1.go43
-rw-r--r--plugins/redis/tests/redis_plugin_test.go124
-rw-r--r--plugins/redis/tests/redis_plugin_tests.go1
7 files changed, 243 insertions, 78 deletions
diff --git a/plugins/config/plugin.go b/plugins/config/plugin.go
index 2555d28a..b438a185 100755
--- a/plugins/config/plugin.go
+++ b/plugins/config/plugin.go
@@ -1,6 +1,7 @@
package config
import (
+ "bytes"
"errors"
"fmt"
"strings"
@@ -9,9 +10,10 @@ import (
)
type Viper struct {
- viper *viper.Viper
- Path string
- Prefix string
+ viper *viper.Viper
+ Path string
+ Prefix string
+ ReadInCfg []byte
}
// Inits config provider.
@@ -32,17 +34,16 @@ func (v *Viper) Init() error {
v.viper.SetConfigFile(v.Path)
v.viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ if v.ReadInCfg != nil {
+ return v.viper.ReadConfig(bytes.NewBuffer(v.ReadInCfg))
+ }
return v.viper.ReadInConfig()
}
// Overwrite overwrites existing config with provided values
-func (v *Viper) Overwrite(values map[string]string) error {
+func (v *Viper) Overwrite(values map[string]interface{}) error {
if len(values) != 0 {
- for _, flag := range values {
- key, value, err := parseFlag(flag)
- if err != nil {
- return err
- }
+ for key, value := range values {
v.viper.Set(key, value)
}
}
@@ -69,16 +70,6 @@ func (v *Viper) Has(name string) bool {
return v.viper.IsSet(name)
}
-func parseFlag(flag string) (string, string, error) {
- if !strings.Contains(flag, "=") {
- return "", "", fmt.Errorf("invalid flag `%s`", flag)
- }
-
- parts := strings.SplitN(strings.TrimLeft(flag, " \"'`"), "=", 2)
-
- return strings.Trim(parts[0], " \n\t"), parseValue(strings.Trim(parts[1], " \n\t")), nil
-}
-
func parseValue(value string) string {
escape := []rune(value)[0]
diff --git a/plugins/redis/config.go b/plugins/redis/config.go
index b39fcd00..ebcefed1 100644
--- a/plugins/redis/config.go
+++ b/plugins/redis/config.go
@@ -1,18 +1,32 @@
package redis
+import "time"
+
type Config struct {
- // Addr is address to use. If len > 1, cluster client will be used
- Addr []string
- // database number to use, 0 is used by default
- DB int
- // Master name for failover client, empty by default
- Master string
- // Redis password, empty by default
- Password string
+ Addrs []string `yaml:"addrs"`
+ DB int `yaml:"db"`
+ Username string `yaml:"username"`
+ Password string `yaml:"password"`
+ MasterName string `yaml:"master_name"`
+ SentinelPassword string `yaml:"sentinel_password"`
+ RouteByLatency bool `yaml:"route_by_latency"`
+ RouteRandomly bool `yaml:"route_randomly"`
+ MaxRetries int `yaml:"max_retries"`
+ DialTimeout time.Duration `yaml:"dial_timeout"`
+ MinRetryBackoff time.Duration `yaml:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `yaml:"max_retry_backoff"`
+ PoolSize int `yaml:"pool_size"`
+ MinIdleConns int `yaml:"min_idle_conns"`
+ MaxConnAge time.Duration `yaml:"max_conn_age"`
+ ReadTimeout time.Duration `yaml:"read_timeout"`
+ WriteTimeout time.Duration `yaml:"write_timeout"`
+ PoolTimeout time.Duration `yaml:"pool_timeout"`
+ IdleTimeout time.Duration `yaml:"idle_timeout"`
+ IdleCheckFreq time.Duration `yaml:"idle_check_freq"`
+ ReadOnly bool `yaml:"read_only"`
}
// InitDefaults initializing fill config with default values
-func (s *Config) InitDefaults() error {
- s.Addr = []string{"localhost:6379"} // default addr is pointing to local storage
- return nil
+func (s *Config) InitDefaults() {
+ s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
}
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 64b6024e..08bb7972 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -2,6 +2,7 @@ package redis
import (
"github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/config"
"github.com/spiral/roadrunner/v2/interfaces/log"
)
@@ -11,44 +12,62 @@ const PluginName = "redis"
type Plugin struct {
// config for RR integration
cfg *Config
- // redis client
- universalClient *redis.UniversalClient
- clusterClient *redis.ClusterClient
- client *redis.Client
- sentinelClient *redis.SentinelClient
+ // logger
+ log log.Logger
+ // redis universal client
+ universalClient redis.UniversalClient
}
-func (s *Plugin) GetClient() *redis.Client {
- return s.client
-}
-
-func (s *Plugin) GetUniversalClient() *redis.UniversalClient {
+func (s *Plugin) GetClient() redis.UniversalClient {
return s.universalClient
}
-func (s *Plugin) GetClusterClient() *redis.ClusterClient {
- return s.clusterClient
-}
+func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
+ const op = errors.Op("redis plugin init")
+ s.cfg = &Config{}
+ s.cfg.InitDefaults()
-func (s *Plugin) GetSentinelClient() *redis.SentinelClient {
- return s.sentinelClient
-}
+ err := cfg.UnmarshalKey(PluginName, &s.cfg)
+ if err != nil {
+ return errors.E(op, errors.Disabled, err)
+ }
+
+ s.log = log
+
+ s.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: s.cfg.Addrs,
+ DB: s.cfg.DB,
+ Username: s.cfg.Username,
+ Password: s.cfg.Password,
+ SentinelPassword: s.cfg.SentinelPassword,
+ MaxRetries: s.cfg.MaxRetries,
+ MinRetryBackoff: s.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: s.cfg.MaxRetryBackoff,
+ DialTimeout: s.cfg.DialTimeout,
+ ReadTimeout: s.cfg.ReadTimeout,
+ WriteTimeout: s.cfg.WriteTimeout,
+ PoolSize: s.cfg.PoolSize,
+ MinIdleConns: s.cfg.MinIdleConns,
+ MaxConnAge: s.cfg.MaxConnAge,
+ PoolTimeout: s.cfg.PoolTimeout,
+ IdleTimeout: s.cfg.IdleTimeout,
+ IdleCheckFrequency: s.cfg.IdleCheckFreq,
+ ReadOnly: s.cfg.ReadOnly,
+ RouteByLatency: s.cfg.RouteByLatency,
+ RouteRandomly: s.cfg.RouteRandomly,
+ MasterName: s.cfg.MasterName,
+ })
-func (s *Plugin) Init(cfg config.Configurer, log log.Logger) error {
- _ = cfg
- _ = log
- _ = s.cfg
return nil
}
func (s *Plugin) Serve() chan error {
errCh := make(chan error, 1)
-
return errCh
}
func (s Plugin) Stop() error {
- return nil
+ return s.universalClient.Close()
}
func (s *Plugin) Name() string {
diff --git a/plugins/redis/tests/configs/.rr-redis.yaml b/plugins/redis/tests/configs/.rr-redis.yaml
deleted file mode 100644
index 52198a35..00000000
--- a/plugins/redis/tests/configs/.rr-redis.yaml
+++ /dev/null
@@ -1,25 +0,0 @@
-redis:
- - cluster:
- addr:
- - 'localhost:6379'
- db: 0
- master: null
- password: ''
- - universal:
- addr:
- - 'localhost:6379'
- db: 0
- master: null
- password: ''
- - default:
- addr:
- - 'localhost:6379'
- db: 0
- master: null
- password: ''
- - sentinel:
- addr:
- - 'localhost:6379'
- db: 0
- master: null
- password: '' \ No newline at end of file
diff --git a/plugins/redis/tests/plugin1.go b/plugins/redis/tests/plugin1.go
new file mode 100644
index 00000000..e19ca90a
--- /dev/null
+++ b/plugins/redis/tests/plugin1.go
@@ -0,0 +1,43 @@
+package tests
+
+import (
+ "context"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ redisPlugin "github.com/spiral/roadrunner/v2/interfaces/redis"
+)
+
+type Plugin1 struct {
+ redisClient redis.UniversalClient
+}
+
+func (p *Plugin1) Init(redis redisPlugin.Redis) error {
+ p.redisClient = redis.GetClient()
+ return nil
+}
+
+func (p *Plugin1) Serve() chan error {
+ const op = errors.Op("plugin1 serve")
+ errCh := make(chan error, 1)
+ p.redisClient.Set(context.Background(), "foo", "bar", time.Minute)
+
+ stringCmd := p.redisClient.Get(context.Background(), "foo")
+ data, err := stringCmd.Result()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ if data != "bar" {
+ errCh <- errors.E(op, errors.Str("no such key"))
+ return errCh
+ }
+
+ return errCh
+}
+
+func (p *Plugin1) Stop() error {
+ return p.redisClient.Close()
+}
diff --git a/plugins/redis/tests/redis_plugin_test.go b/plugins/redis/tests/redis_plugin_test.go
new file mode 100644
index 00000000..8f8da983
--- /dev/null
+++ b/plugins/redis/tests/redis_plugin_test.go
@@ -0,0 +1,124 @@
+package tests
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/alicebob/miniredis/v2"
+ "github.com/golang/mock/gomock"
+ "github.com/spiral/endure"
+ "github.com/spiral/roadrunner/v2/mocks"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ "github.com/stretchr/testify/assert"
+)
+
+func redisConfig(port string) string {
+ cfg := `
+redis:
+ addrs:
+ - 'localhost:%s'
+ master_name: ''
+ username: ''
+ password: ''
+ db: 0
+ sentinel_password: ''
+ route_by_latency: false
+ route_randomly: false
+ dial_timeout: 0
+ max_retries: 1
+ min_retry_backoff: 0
+ max_retry_backoff: 0
+ pool_size: 0
+ min_idle_conns: 0
+ max_conn_age: 0
+ read_timeout: 0
+ write_timeout: 0
+ pool_timeout: 0
+ idle_timeout: 0
+ idle_check_freq: 0
+ read_only: false
+`
+ return fmt.Sprintf(cfg, port)
+}
+
+func TestRedisInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.DebugLevel))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ s, err := miniredis.Run()
+ if err != nil {
+ panic(err)
+ }
+ defer s.Close()
+
+ c := redisConfig(s.Port())
+
+ cfg := &config.Viper{}
+ cfg.Prefix = "rr"
+ cfg.Path = ".rr-redis.yaml"
+ cfg.ReadInCfg = []byte(c)
+
+ controller := gomock.NewController(t)
+ mockLogger := mocks.NewMockLogger(controller)
+
+ err = cont.RegisterAll(
+ cfg,
+ mockLogger,
+ &redis.Plugin{},
+ &Plugin1{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ tt := time.NewTimer(time.Second * 10)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-tt.C:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ wg.Wait()
+}
diff --git a/plugins/redis/tests/redis_plugin_tests.go b/plugins/redis/tests/redis_plugin_tests.go
deleted file mode 100644
index ca8701d2..00000000
--- a/plugins/redis/tests/redis_plugin_tests.go
+++ /dev/null
@@ -1 +0,0 @@
-package tests