summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
committerValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
commit75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd (patch)
tree1e9a910071d20021ad0f7ef4fe6099bac6a341ef
parentdc8ed203c247afd684f198ebbac103a10bfad72a (diff)
- Rework redis with ws plugins
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r--pkg/pubsub/interface.go4
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/drivers/memory/plugin.go2
-rw-r--r--plugins/kv/drivers/redis/config.go34
-rw-r--r--plugins/kv/drivers/redis/plugin.go51
-rw-r--r--plugins/kv/interface.go2
-rw-r--r--plugins/kv/plugin.go (renamed from plugins/kv/storage.go)40
-rw-r--r--plugins/memory/plugin.go25
-rw-r--r--plugins/memory/pubsub.go (renamed from plugins/websockets/memory/inMemory.go)24
-rw-r--r--plugins/redis/clients.go84
-rw-r--r--plugins/redis/interface.go7
-rw-r--r--plugins/redis/kv.go (renamed from plugins/kv/drivers/redis/driver.go)0
-rw-r--r--plugins/redis/plugin.go181
-rw-r--r--plugins/redis/pubsub.go189
-rw-r--r--plugins/websockets/config.go43
-rw-r--r--plugins/websockets/plugin.go77
-rw-r--r--tests/plugins/kv/configs/.rr-redis-global.yaml14
-rw-r--r--tests/plugins/kv/configs/.rr-redis-no-config.yaml10
-rw-r--r--tests/plugins/kv/storage_plugin_test.go141
-rw-r--r--tests/plugins/redis/plugin1.go6
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go7
22 files changed, 658 insertions, 287 deletions
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index 4926cad6..d021dbbe 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -44,3 +44,7 @@ type Publisher interface {
type Reader interface {
Next() (*websocketsv1.Message, error)
}
+
+type PSProvider interface {
+ PSProvide(key string) (PubSub, error)
+}
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 9b4cf9f7..28e2a89c 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 3997e0d4..936b2047 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
diff --git a/plugins/kv/drivers/memory/plugin.go b/plugins/kv/drivers/memory/plugin.go
index 2be7caae..da81017e 100644
--- a/plugins/kv/drivers/memory/plugin.go
+++ b/plugins/kv/drivers/memory/plugin.go
@@ -45,7 +45,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
+func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/redis/config.go b/plugins/kv/drivers/redis/config.go
deleted file mode 100644
index 41348236..00000000
--- a/plugins/kv/drivers/redis/config.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package redis
-
-import "time"
-
-type Config struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
-// InitDefaults initializing fill config with default values
-func (s *Config) InitDefaults() {
- if s.Addrs == nil {
- s.Addrs = []string{"localhost:6379"} // default addr is pointing to local storage
- }
-}
diff --git a/plugins/kv/drivers/redis/plugin.go b/plugins/kv/drivers/redis/plugin.go
deleted file mode 100644
index 3694c5a7..00000000
--- a/plugins/kv/drivers/redis/plugin.go
+++ /dev/null
@@ -1,51 +0,0 @@
-package redis
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/kv"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const PluginName = "redis"
-
-// Plugin BoltDB K/V storage.
-type Plugin struct {
- cfgPlugin config.Configurer
- // logger
- log logger.Logger
-}
-
-func (s *Plugin) Init(log logger.Logger, cfg config.Configurer) error {
- if !cfg.Has(kv.PluginName) {
- return errors.E(errors.Disabled)
- }
-
- s.log = log
- s.cfgPlugin = cfg
- return nil
-}
-
-// Serve is noop here
-func (s *Plugin) Serve() chan error {
- return make(chan error)
-}
-
-func (s *Plugin) Stop() error {
- return nil
-}
-
-func (s *Plugin) Provide(key string) (kv.Storage, error) {
- const op = errors.Op("redis_plugin_provide")
- st, err := NewRedisDriver(s.log, key, s.cfgPlugin)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return st, nil
-}
-
-// Name returns plugin name
-func (s *Plugin) Name() string {
- return PluginName
-}
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 744c6b51..5aedd5c3 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -37,5 +37,5 @@ type StorageDriver interface {
// Provider provides storage based on the config
type Provider interface {
// Provide provides Storage based on the config key
- Provide(key string) (Storage, error)
+ KVProvide(key string) (Storage, error)
}
diff --git a/plugins/kv/storage.go b/plugins/kv/plugin.go
index 9a609735..efe92252 100644
--- a/plugins/kv/storage.go
+++ b/plugins/kv/plugin.go
@@ -50,7 +50,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
return nil
}
-func (p *Plugin) Serve() chan error {
+func (p *Plugin) Serve() chan error { //nolint:gocognit
errCh := make(chan error, 1)
const op = errors.Op("kv_plugin_serve")
// key - storage name in the config
@@ -100,7 +100,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[memcached].Provide(configKey)
+ storage, err := p.drivers[memcached].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -115,7 +115,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[boltdb].Provide(configKey)
+ storage, err := p.drivers[boltdb].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -129,7 +129,7 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[memory].Provide(configKey)
+ storage, err := p.drivers[memory].KVProvide(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -143,15 +143,33 @@ func (p *Plugin) Serve() chan error {
continue
}
- storage, err := p.drivers[redis].Provide(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ case p.cfgPlugin.Has(redis):
+ storage, err := p.drivers[redis].KVProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the storage
+ p.storages[k] = storage
+ continue
+ default:
+ // otherwise - error, no local or global config
+ p.log.Warn("no global or local redis configuration provided", "key", configKey)
+ continue
}
- // save the storage
- p.storages[k] = storage
-
default:
p.log.Error("unknown storage", errors.E(op, errors.Errorf("unknown storage %s", v.(map[string]interface{})[driver])))
}
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
new file mode 100644
index 00000000..650f0b4b
--- /dev/null
+++ b/plugins/memory/plugin.go
@@ -0,0 +1,25 @@
+package memory
+
+import (
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const PluginName string = "memory"
+
+type Plugin struct {
+ log logger.Logger
+}
+
+func (p *Plugin) Init(log logger.Logger) error {
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+ return NewPubSubDriver(p.log, key)
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/memory/pubsub.go
index cef28182..75cd9d24 100644
--- a/plugins/websockets/memory/inMemory.go
+++ b/plugins/memory/pubsub.go
@@ -10,36 +10,36 @@ import (
"google.golang.org/protobuf/proto"
)
-type Plugin struct {
+type PubSubDriver struct {
sync.RWMutex
- log logger.Logger
-
// channel with the messages from the RPC
pushCh chan []byte
// user-subscribed topics
storage bst.Storage
+ log logger.Logger
}
-func NewInMemory(log logger.Logger) pubsub.PubSub {
- return &Plugin{
- log: log,
+func NewPubSubDriver(log logger.Logger, _ string) (pubsub.PubSub, error) {
+ ps := &PubSubDriver{
pushCh: make(chan []byte, 10),
storage: bst.NewBST(),
+ log: log,
}
+ return ps, nil
}
-func (p *Plugin) Publish(message []byte) error {
+func (p *PubSubDriver) Publish(message []byte) error {
p.pushCh <- message
return nil
}
-func (p *Plugin) PublishAsync(message []byte) {
+func (p *PubSubDriver) PublishAsync(message []byte) {
go func() {
p.pushCh <- message
}()
}
-func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
+func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(topics); i++ {
@@ -48,7 +48,7 @@ func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
return nil
}
-func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
+func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
p.Lock()
defer p.Unlock()
for i := 0; i < len(topics); i++ {
@@ -57,7 +57,7 @@ func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
return nil
}
-func (p *Plugin) Connections(topic string, res map[string]struct{}) {
+func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
p.RLock()
defer p.RUnlock()
@@ -67,7 +67,7 @@ func (p *Plugin) Connections(topic string, res map[string]struct{}) {
}
}
-func (p *Plugin) Next() (*websocketsv1.Message, error) {
+func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
msg := <-p.pushCh
if msg == nil {
return nil, nil
diff --git a/plugins/redis/clients.go b/plugins/redis/clients.go
new file mode 100644
index 00000000..d0a184d2
--- /dev/null
+++ b/plugins/redis/clients.go
@@ -0,0 +1,84 @@
+package redis
+
+import (
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+)
+
+// RedisClient return a client based on the provided section key
+// key sample: kv.some-section.redis
+// kv.redis
+// redis (root)
+func (p *Plugin) RedisClient(key string) (redis.UniversalClient, error) {
+ const op = errors.Op("redis_get_client")
+
+ if !p.cfgPlugin.Has(key) {
+ return nil, errors.E(op, errors.Errorf("no such section: %s", key))
+ }
+
+ cfg := &Config{}
+
+ err := p.cfgPlugin.UnmarshalKey(key, cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ cfg.InitDefaults()
+
+ uc := redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: cfg.Addrs,
+ DB: cfg.DB,
+ Username: cfg.Username,
+ Password: cfg.Password,
+ SentinelPassword: cfg.SentinelPassword,
+ MaxRetries: cfg.MaxRetries,
+ MinRetryBackoff: cfg.MaxRetryBackoff,
+ MaxRetryBackoff: cfg.MaxRetryBackoff,
+ DialTimeout: cfg.DialTimeout,
+ ReadTimeout: cfg.ReadTimeout,
+ WriteTimeout: cfg.WriteTimeout,
+ PoolSize: cfg.PoolSize,
+ MinIdleConns: cfg.MinIdleConns,
+ MaxConnAge: cfg.MaxConnAge,
+ PoolTimeout: cfg.PoolTimeout,
+ IdleTimeout: cfg.IdleTimeout,
+ IdleCheckFrequency: cfg.IdleCheckFreq,
+ ReadOnly: cfg.ReadOnly,
+ RouteByLatency: cfg.RouteByLatency,
+ RouteRandomly: cfg.RouteRandomly,
+ MasterName: cfg.MasterName,
+ })
+
+ return uc, nil
+}
+
+func (p *Plugin) DefaultClient() redis.UniversalClient {
+ cfg := &Config{}
+ cfg.InitDefaults()
+
+ uc := redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: cfg.Addrs,
+ DB: cfg.DB,
+ Username: cfg.Username,
+ Password: cfg.Password,
+ SentinelPassword: cfg.SentinelPassword,
+ MaxRetries: cfg.MaxRetries,
+ MinRetryBackoff: cfg.MaxRetryBackoff,
+ MaxRetryBackoff: cfg.MaxRetryBackoff,
+ DialTimeout: cfg.DialTimeout,
+ ReadTimeout: cfg.ReadTimeout,
+ WriteTimeout: cfg.WriteTimeout,
+ PoolSize: cfg.PoolSize,
+ MinIdleConns: cfg.MinIdleConns,
+ MaxConnAge: cfg.MaxConnAge,
+ PoolTimeout: cfg.PoolTimeout,
+ IdleTimeout: cfg.IdleTimeout,
+ IdleCheckFrequency: cfg.IdleCheckFreq,
+ ReadOnly: cfg.ReadOnly,
+ RouteByLatency: cfg.RouteByLatency,
+ RouteRandomly: cfg.RouteRandomly,
+ MasterName: cfg.MasterName,
+ })
+
+ return uc
+}
diff --git a/plugins/redis/interface.go b/plugins/redis/interface.go
index c0be6137..189b0002 100644
--- a/plugins/redis/interface.go
+++ b/plugins/redis/interface.go
@@ -4,6 +4,9 @@ import "github.com/go-redis/redis/v8"
// Redis in the redis KV plugin interface
type Redis interface {
- // GetClient provides universal redis client
- GetClient() redis.UniversalClient
+ // RedisClient provides universal redis client
+ RedisClient(key string) (redis.UniversalClient, error)
+
+ // DefaultClient provide default redis client based on redis defaults
+ DefaultClient() redis.UniversalClient
}
diff --git a/plugins/kv/drivers/redis/driver.go b/plugins/redis/kv.go
index 66cb8384..66cb8384 100644
--- a/plugins/kv/drivers/redis/driver.go
+++ b/plugins/redis/kv.go
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 47ffeb39..24c21b55 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -1,15 +1,14 @@
package redis
import (
- "context"
"sync"
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
)
const PluginName = "redis"
@@ -17,80 +16,37 @@ const PluginName = "redis"
type Plugin struct {
sync.RWMutex
// config for RR integration
- cfg *Config
+ cfgPlugin config.Configurer
// logger
log logger.Logger
// redis universal client
universalClient redis.UniversalClient
// fanIn implementation used to deliver messages from all channels to the single websocket point
- fanin *FanIn
-}
-
-func (p *Plugin) GetClient() redis.UniversalClient {
- return p.universalClient
+ stopCh chan struct{}
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("redis_plugin_init")
-
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, errors.Disabled, err)
- }
-
- p.cfg.InitDefaults()
p.log = log
-
- p.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: p.cfg.Addrs,
- DB: p.cfg.DB,
- Username: p.cfg.Username,
- Password: p.cfg.Password,
- SentinelPassword: p.cfg.SentinelPassword,
- MaxRetries: p.cfg.MaxRetries,
- MinRetryBackoff: p.cfg.MaxRetryBackoff,
- MaxRetryBackoff: p.cfg.MaxRetryBackoff,
- DialTimeout: p.cfg.DialTimeout,
- ReadTimeout: p.cfg.ReadTimeout,
- WriteTimeout: p.cfg.WriteTimeout,
- PoolSize: p.cfg.PoolSize,
- MinIdleConns: p.cfg.MinIdleConns,
- MaxConnAge: p.cfg.MaxConnAge,
- PoolTimeout: p.cfg.PoolTimeout,
- IdleTimeout: p.cfg.IdleTimeout,
- IdleCheckFrequency: p.cfg.IdleCheckFreq,
- ReadOnly: p.cfg.ReadOnly,
- RouteByLatency: p.cfg.RouteByLatency,
- RouteRandomly: p.cfg.RouteRandomly,
- MasterName: p.cfg.MasterName,
- })
-
- // init fanin
- p.fanin = newFanIn(p.universalClient, log)
+ p.cfgPlugin = cfg
+ p.stopCh = make(chan struct{}, 1)
return nil
}
func (p *Plugin) Serve() chan error {
- errCh := make(chan error)
- return errCh
+ return make(chan error)
}
func (p *Plugin) Stop() error {
const op = errors.Op("redis_plugin_stop")
- err := p.fanin.stop()
- if err != nil {
- return errors.E(op, err)
- }
+ p.stopCh <- struct{}{}
- err = p.universalClient.Close()
- if err != nil {
- return errors.E(op, err)
+ if p.universalClient != nil {
+ err := p.universalClient.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
}
return nil
@@ -103,112 +59,17 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-func (p *Plugin) Publish(msg []byte) error {
- p.Lock()
- defer p.Unlock()
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return errors.E(err)
- }
-
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- return f.Err()
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(msg []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- p.log.Error("message unmarshal error")
- return
- }
-
- for j := 0; j < len(m.GetTopics()); j++ {
- f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
- if f.Err() != nil {
- p.log.Error("redis publish", "error", f.Err())
- }
- }
- }()
-}
-
-func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
- // just add a connection
- for i := 0; i < len(topics); i++ {
- // key - topic
- // value - connectionID
- hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID)
- res, err := hset.Result()
- if err != nil {
- return err
- }
- if res == 0 {
- p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
- continue
- }
- }
-
- // and subscribe after
- return p.fanin.sub(topics...)
-}
-
-func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
- // Remove topics from the storage
- for i := 0; i < len(topics); i++ {
- srem := p.universalClient.SRem(context.Background(), topics[i], connectionID)
- if srem.Err() != nil {
- return srem.Err()
- }
- }
-
- for i := 0; i < len(topics); i++ {
- // if there are no such topics, we can safely unsubscribe from the redis
- exists := p.universalClient.Exists(context.Background(), topics[i])
- res, err := exists.Result()
- if err != nil {
- return err
- }
-
- // if we have associated connections - skip
- if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it
- continue
- }
-
- // else - unsubscribe
- err = p.fanin.unsub(topics[i])
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-func (p *Plugin) Connections(topic string, res map[string]struct{}) {
- hget := p.universalClient.SMembersMap(context.Background(), topic)
- r, err := hget.Result()
+// KVProvide provides KV storage implementation over the redis plugin
+func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+ const op = errors.Op("redis_plugin_provide")
+ st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
- panic(err)
+ return nil, errors.E(op, err)
}
- // assighn connections
- // res expected to be from the sync.Pool
- for k := range r {
- res[k] = struct{}{}
- }
+ return st, nil
}
-// Next return next message
-func (p *Plugin) Next() (*websocketsv1.Message, error) {
- return <-p.fanin.consume(), nil
+func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+ return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
new file mode 100644
index 00000000..dbda7ea4
--- /dev/null
+++ b/plugins/redis/pubsub.go
@@ -0,0 +1,189 @@
+package redis
+
+import (
+ "context"
+ "sync"
+
+ "github.com/go-redis/redis/v8"
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+type PubSubDriver struct {
+ sync.RWMutex
+ cfg *Config `mapstructure:"redis"`
+
+ log logger.Logger
+ fanin *FanIn
+ universalClient redis.UniversalClient
+ stopCh chan struct{}
+}
+
+func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer, stopCh chan struct{}) (pubsub.PubSub, error) {
+ const op = errors.Op("new_pub_sub_driver")
+ ps := &PubSubDriver{
+ log: log,
+ stopCh: stopCh,
+ }
+
+ // will be different for every connected driver
+ err := cfgPlugin.UnmarshalKey(key, &ps.cfg)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ ps.cfg.InitDefaults()
+
+ ps.universalClient = redis.NewUniversalClient(&redis.UniversalOptions{
+ Addrs: ps.cfg.Addrs,
+ DB: ps.cfg.DB,
+ Username: ps.cfg.Username,
+ Password: ps.cfg.Password,
+ SentinelPassword: ps.cfg.SentinelPassword,
+ MaxRetries: ps.cfg.MaxRetries,
+ MinRetryBackoff: ps.cfg.MaxRetryBackoff,
+ MaxRetryBackoff: ps.cfg.MaxRetryBackoff,
+ DialTimeout: ps.cfg.DialTimeout,
+ ReadTimeout: ps.cfg.ReadTimeout,
+ WriteTimeout: ps.cfg.WriteTimeout,
+ PoolSize: ps.cfg.PoolSize,
+ MinIdleConns: ps.cfg.MinIdleConns,
+ MaxConnAge: ps.cfg.MaxConnAge,
+ PoolTimeout: ps.cfg.PoolTimeout,
+ IdleTimeout: ps.cfg.IdleTimeout,
+ IdleCheckFrequency: ps.cfg.IdleCheckFreq,
+ ReadOnly: ps.cfg.ReadOnly,
+ RouteByLatency: ps.cfg.RouteByLatency,
+ RouteRandomly: ps.cfg.RouteRandomly,
+ MasterName: ps.cfg.MasterName,
+ })
+
+ ps.fanin = newFanIn(ps.universalClient, log)
+
+ ps.stop()
+
+ return ps, nil
+}
+
+func (p *PubSubDriver) stop() {
+ go func() {
+ for range p.stopCh {
+ _ = p.fanin.stop()
+ return
+ }
+ }()
+}
+
+func (p *PubSubDriver) Publish(msg []byte) error {
+ p.Lock()
+ defer p.Unlock()
+
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ return errors.E(err)
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
+ if f.Err() != nil {
+ return f.Err()
+ }
+ }
+ return nil
+}
+
+func (p *PubSubDriver) PublishAsync(msg []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ m := &websocketsv1.Message{}
+ err := proto.Unmarshal(msg, m)
+ if err != nil {
+ p.log.Error("message unmarshal error")
+ return
+ }
+
+ for j := 0; j < len(m.GetTopics()); j++ {
+ f := p.universalClient.Publish(context.Background(), m.GetTopics()[j], msg)
+ if f.Err() != nil {
+ p.log.Error("redis publish", "error", f.Err())
+ }
+ }
+ }()
+}
+
+func (p *PubSubDriver) Subscribe(connectionID string, topics ...string) error {
+ // just add a connection
+ for i := 0; i < len(topics); i++ {
+ // key - topic
+ // value - connectionID
+ hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID)
+ res, err := hset.Result()
+ if err != nil {
+ return err
+ }
+ if res == 0 {
+ p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i])
+ continue
+ }
+ }
+
+ // and subscribe after
+ return p.fanin.sub(topics...)
+}
+
+func (p *PubSubDriver) Unsubscribe(connectionID string, topics ...string) error {
+ // Remove topics from the storage
+ for i := 0; i < len(topics); i++ {
+ srem := p.universalClient.SRem(context.Background(), topics[i], connectionID)
+ if srem.Err() != nil {
+ return srem.Err()
+ }
+ }
+
+ for i := 0; i < len(topics); i++ {
+ // if there are no such topics, we can safely unsubscribe from the redis
+ exists := p.universalClient.Exists(context.Background(), topics[i])
+ res, err := exists.Result()
+ if err != nil {
+ return err
+ }
+
+ // if we have associated connections - skip
+ if res == 1 { // exists means that topic still exists and some other nodes may have connections associated with it
+ continue
+ }
+
+ // else - unsubscribe
+ err = p.fanin.unsub(topics[i])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (p *PubSubDriver) Connections(topic string, res map[string]struct{}) {
+ hget := p.universalClient.SMembersMap(context.Background(), topic)
+ r, err := hget.Result()
+ if err != nil {
+ panic(err)
+ }
+
+ // assighn connections
+ // res expected to be from the sync.Pool
+ for k := range r {
+ res[k] = struct{}{}
+ }
+}
+
+// Next return next message
+func (p *PubSubDriver) Next() (*websocketsv1.Message, error) {
+ return <-p.fanin.consume(), nil
+}
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index be4aaa82..93d9ac3b 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -7,14 +7,48 @@ import (
)
/*
+# GLOBAL
+redis:
+ addrs:
+ - 'localhost:6379'
+
websockets:
# pubsubs should implement PubSub interface to be collected via endure.Collects
pubsubs:["redis", "amqp", "memory"]
+ # OR local
+ redis:
+ addrs:
+ - 'localhost:6379'
+
# path used as websockets path
path: "/ws"
*/
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
@@ -23,6 +57,8 @@ type Config struct {
PubSubs []string `mapstructure:"pubsubs"`
Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+
Pool *pool.Config `mapstructure:"pool"`
}
@@ -55,4 +91,11 @@ func (c *Config) InitDefault() {
}
c.Pool.Supervisor.InitDefaults()
}
+
+ if c.Redis != nil {
+ if c.Redis.Addrs == nil {
+ // append default
+ c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
+ }
+ }
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 6ddd609c..c53491b4 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,6 +2,7 @@ package websockets
import (
"context"
+ "fmt"
"net/http"
"sync"
"time"
@@ -23,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
- "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"google.golang.org/protobuf/proto"
@@ -38,8 +38,11 @@ type Plugin struct {
// Collection with all available pubsubs
pubsubs map[string]pubsub.PubSub
- cfg *Config
- log logger.Logger
+ psProviders map[string]pubsub.PSProvider
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// global connections map
connections sync.Map
@@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
-
p.pubsubs = make(map[string]pubsub.PubSub)
+ p.psProviders = make(map[string]pubsub.PSProvider)
+
p.log = log
+ p.cfgPlugin = cfg
+
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
ReadBufferSize: 1024,
@@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
- // attach default driver
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
-
return nil
}
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
+ const op = errors.Op("websockets_plugin_serve")
+
+ err := p.initPubSubs()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
go func() {
var err error
@@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error {
return errCh
}
+func (p *Plugin) initPubSubs() error {
+ for i := 0; i < len(p.cfg.PubSubs); i++ {
+ // don't need to have a section for the in-memory
+ if p.cfg.PubSubs[i] == "memory" {
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ r, err := provider.PSProvide("")
+ if err != nil {
+ return err
+ }
+
+ // append default in-memory provider
+ p.pubsubs["memory"] = r
+ }
+ continue
+ }
+ // key - memory, redis
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ // try local key
+ switch {
+ // try local config first
+ case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+ r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+ r, err := provider.PSProvide(p.cfg.PubSubs[i])
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ default:
+ return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+ }
+ } else {
+ // no such driver
+ p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+ }
+ }
+
+ return nil
+}
+
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
@@ -167,8 +225,8 @@ func (p *Plugin) Name() string {
}
// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
- p.pubsubs[name.Name()] = pub
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
+ p.psProviders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid
}
}
-// go:inline
func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
const op = errors.Op("exec")
pd := payload.Payload{
diff --git a/tests/plugins/kv/configs/.rr-redis-global.yaml b/tests/plugins/kv/configs/.rr-redis-global.yaml
new file mode 100644
index 00000000..d2e8aefe
--- /dev/null
+++ b/tests/plugins/kv/configs/.rr-redis-global.yaml
@@ -0,0 +1,14 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+logs:
+ mode: development
+ level: error
+
+redis-rr:
+ addrs:
+ - 'localhost:6379'
+
+kv:
+ redis-rr:
+ driver: redis
diff --git a/tests/plugins/kv/configs/.rr-redis-no-config.yaml b/tests/plugins/kv/configs/.rr-redis-no-config.yaml
new file mode 100644
index 00000000..9cf06374
--- /dev/null
+++ b/tests/plugins/kv/configs/.rr-redis-no-config.yaml
@@ -0,0 +1,10 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+logs:
+ mode: development
+ level: error
+
+kv:
+ redis-rr:
+ driver: redis
diff --git a/tests/plugins/kv/storage_plugin_test.go b/tests/plugins/kv/storage_plugin_test.go
index fd8a58cf..e7e7735a 100644
--- a/tests/plugins/kv/storage_plugin_test.go
+++ b/tests/plugins/kv/storage_plugin_test.go
@@ -18,8 +18,8 @@ import (
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/boltdb"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memcached"
"github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
- "github.com/spiral/roadrunner/v2/plugins/kv/drivers/redis"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/stretchr/testify/assert"
)
@@ -158,6 +158,7 @@ func TestBoltDb(t *testing.T) {
&boltdb.Plugin{},
&rpcPlugin.Plugin{},
&logger.ZapLogger{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -373,6 +374,7 @@ func TestMemcached(t *testing.T) {
&memcached.Plugin{},
&rpcPlugin.Plugin{},
&logger.ZapLogger{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -801,6 +803,143 @@ func TestRedis(t *testing.T) {
&redis.Plugin{},
&rpcPlugin.Plugin{},
&logger.ZapLogger{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("REDIS", testRPCMethodsRedis)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestRedisGlobalSection(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-redis-global.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &kv.Plugin{},
+ &redis.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &memory.Plugin{},
+ )
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ assert.NoError(t, err)
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ t.Run("REDIS", testRPCMethodsRedis)
+ stopCh <- struct{}{}
+ wg.Wait()
+}
+
+func TestRedisNoConfig(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-redis-no-config.yaml", // should be used default
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &kv.Plugin{},
+ &redis.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
diff --git a/tests/plugins/redis/plugin1.go b/tests/plugins/redis/plugin1.go
index e50213e5..68da1394 100644
--- a/tests/plugins/redis/plugin1.go
+++ b/tests/plugins/redis/plugin1.go
@@ -14,8 +14,10 @@ type Plugin1 struct {
}
func (p *Plugin1) Init(redis redisPlugin.Redis) error {
- p.redisClient = redis.GetClient()
- return nil
+ var err error
+ p.redisClient, err = redis.RedisClient("redis")
+
+ return err
}
func (p *Plugin1) Serve() chan error {
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index 3ef144eb..88828c5a 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -20,6 +20,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
@@ -45,6 +46,7 @@ func TestBroadcastInit(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -153,6 +155,7 @@ func TestWSRedisAndMemory(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -446,6 +449,7 @@ func TestWSMemoryDeny(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -573,6 +577,7 @@ func TestWSMemoryStop(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -665,6 +670,7 @@ func TestWSMemoryOk(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -853,6 +859,7 @@ func publish2(t *testing.T, command string, broker string, topics ...string) {
assert.NoError(t, err)
assert.True(t, ret.Ok)
}
+
func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
return &websocketsv1.Message{
Topics: topics,