From 75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Mon, 14 Jun 2021 16:39:02 +0300 Subject: - Rework redis with ws plugins Signed-off-by: Valery Piashchynski --- plugins/websockets/config.go | 43 ++++++++++++++++ plugins/websockets/memory/inMemory.go | 95 ----------------------------------- plugins/websockets/plugin.go | 77 ++++++++++++++++++++++++---- 3 files changed, 110 insertions(+), 105 deletions(-) delete mode 100644 plugins/websockets/memory/inMemory.go (limited to 'plugins/websockets') diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go index be4aaa82..93d9ac3b 100644 --- a/plugins/websockets/config.go +++ b/plugins/websockets/config.go @@ -7,14 +7,48 @@ import ( ) /* +# GLOBAL +redis: + addrs: + - 'localhost:6379' + websockets: # pubsubs should implement PubSub interface to be collected via endure.Collects pubsubs:["redis", "amqp", "memory"] + # OR local + redis: + addrs: + - 'localhost:6379' + # path used as websockets path path: "/ws" */ +type RedisConfig struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + // Config represents configuration for the ws plugin type Config struct { // http path for the websocket @@ -23,6 +57,8 @@ type Config struct { PubSubs []string `mapstructure:"pubsubs"` Middleware []string `mapstructure:"middleware"` + Redis *RedisConfig `mapstructure:"redis"` + Pool *pool.Config `mapstructure:"pool"` } @@ -55,4 +91,11 @@ func (c *Config) InitDefault() { } c.Pool.Supervisor.InitDefaults() } + + if c.Redis != nil { + if c.Redis.Addrs == nil { + // append default + c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379") + } + } } diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go deleted file mode 100644 index cef28182..00000000 --- a/plugins/websockets/memory/inMemory.go +++ /dev/null @@ -1,95 +0,0 @@ -package memory - -import ( - "sync" - - "github.com/spiral/roadrunner/v2/pkg/bst" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "google.golang.org/protobuf/proto" -) - -type Plugin struct { - sync.RWMutex - log logger.Logger - - // channel with the messages from the RPC - pushCh chan []byte - // user-subscribed topics - storage bst.Storage -} - -func NewInMemory(log logger.Logger) pubsub.PubSub { - return &Plugin{ - log: log, - pushCh: make(chan []byte, 10), - storage: bst.NewBST(), - } -} - -func (p *Plugin) Publish(message []byte) error { - p.pushCh <- message - return nil -} - -func (p *Plugin) PublishAsync(message []byte) { - go func() { - p.pushCh <- message - }() -} - -func (p *Plugin) Subscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Insert(connectionID, topics[i]) - } - return nil -} - -func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { - p.Lock() - defer p.Unlock() - for i := 0; i < len(topics); i++ { - p.storage.Remove(connectionID, topics[i]) - } - return nil -} - -func (p *Plugin) Connections(topic string, res map[string]struct{}) { - p.RLock() - defer p.RUnlock() - - ret := p.storage.Get(topic) - for rr := range ret { - res[rr] = struct{}{} - } -} - -func (p *Plugin) Next() (*websocketsv1.Message, error) { - msg := <-p.pushCh - if msg == nil { - return nil, nil - } - - p.RLock() - defer p.RUnlock() - - m := &websocketsv1.Message{} - err := proto.Unmarshal(msg, m) - if err != nil { - return nil, err - } - - // push only messages, which are subscribed - // TODO better??? - for i := 0; i < len(m.GetTopics()); i++ { - // if we have active subscribers - send a message to a topic - // or send nil instead - if ok := p.storage.Contains(m.GetTopics()[i]); ok { - return m, nil - } - } - return nil, nil -} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 6ddd609c..c53491b4 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -2,6 +2,7 @@ package websockets import ( "context" + "fmt" "net/http" "sync" "time" @@ -23,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" - "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "google.golang.org/protobuf/proto" @@ -38,8 +38,11 @@ type Plugin struct { // Collection with all available pubsubs pubsubs map[string]pubsub.PubSub - cfg *Config - log logger.Logger + psProviders map[string]pubsub.PSProvider + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // global connections map connections sync.Map @@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.cfg.InitDefault() - p.pubsubs = make(map[string]pubsub.PubSub) + p.psProviders = make(map[string]pubsub.PSProvider) + p.log = log + p.cfgPlugin = cfg + p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, ReadBufferSize: 1024, @@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server - // attach default driver - p.pubsubs["memory"] = memory.NewInMemory(p.log) - return nil } func (p *Plugin) Serve() chan error { errCh := make(chan error) + const op = errors.Op("websockets_plugin_serve") + + err := p.initPubSubs() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } go func() { var err error @@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error { return errCh } +func (p *Plugin) initPubSubs() error { + for i := 0; i < len(p.cfg.PubSubs); i++ { + // don't need to have a section for the in-memory + if p.cfg.PubSubs[i] == "memory" { + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + r, err := provider.PSProvide("") + if err != nil { + return err + } + + // append default in-memory provider + p.pubsubs["memory"] = r + } + continue + } + // key - memory, redis + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + // try local key + switch { + // try local config first + case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): + r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + case p.cfgPlugin.Has(p.cfg.PubSubs[i]): + r, err := provider.PSProvide(p.cfg.PubSubs[i]) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + default: + return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) + } + } else { + // no such driver + p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) + } + } + + return nil +} + func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() @@ -167,8 +225,8 @@ func (p *Plugin) Name() string { } // GetPublishers collects all pubsubs -func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { - p.pubsubs[name.Name()] = pub +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) { + p.psProviders[name.Name()] = pub } func (p *Plugin) Middleware(next http.Handler) http.Handler { @@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid } } -// go:inline func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { const op = errors.Op("exec") pd := payload.Payload{ -- cgit v1.2.3