summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
committerValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
commit75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd (patch)
tree1e9a910071d20021ad0f7ef4fe6099bac6a341ef /plugins/websockets
parentdc8ed203c247afd684f198ebbac103a10bfad72a (diff)
- Rework redis with ws plugins
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/config.go43
-rw-r--r--plugins/websockets/memory/inMemory.go95
-rw-r--r--plugins/websockets/plugin.go77
3 files changed, 110 insertions, 105 deletions
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index be4aaa82..93d9ac3b 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -7,14 +7,48 @@ import (
)
/*
+# GLOBAL
+redis:
+ addrs:
+ - 'localhost:6379'
+
websockets:
# pubsubs should implement PubSub interface to be collected via endure.Collects
pubsubs:["redis", "amqp", "memory"]
+ # OR local
+ redis:
+ addrs:
+ - 'localhost:6379'
+
# path used as websockets path
path: "/ws"
*/
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
@@ -23,6 +57,8 @@ type Config struct {
PubSubs []string `mapstructure:"pubsubs"`
Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+
Pool *pool.Config `mapstructure:"pool"`
}
@@ -55,4 +91,11 @@ func (c *Config) InitDefault() {
}
c.Pool.Supervisor.InitDefaults()
}
+
+ if c.Redis != nil {
+ if c.Redis.Addrs == nil {
+ // append default
+ c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
+ }
+ }
}
diff --git a/plugins/websockets/memory/inMemory.go b/plugins/websockets/memory/inMemory.go
deleted file mode 100644
index cef28182..00000000
--- a/plugins/websockets/memory/inMemory.go
+++ /dev/null
@@ -1,95 +0,0 @@
-package memory
-
-import (
- "sync"
-
- "github.com/spiral/roadrunner/v2/pkg/bst"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-)
-
-type Plugin struct {
- sync.RWMutex
- log logger.Logger
-
- // channel with the messages from the RPC
- pushCh chan []byte
- // user-subscribed topics
- storage bst.Storage
-}
-
-func NewInMemory(log logger.Logger) pubsub.PubSub {
- return &Plugin{
- log: log,
- pushCh: make(chan []byte, 10),
- storage: bst.NewBST(),
- }
-}
-
-func (p *Plugin) Publish(message []byte) error {
- p.pushCh <- message
- return nil
-}
-
-func (p *Plugin) PublishAsync(message []byte) {
- go func() {
- p.pushCh <- message
- }()
-}
-
-func (p *Plugin) Subscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Insert(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error {
- p.Lock()
- defer p.Unlock()
- for i := 0; i < len(topics); i++ {
- p.storage.Remove(connectionID, topics[i])
- }
- return nil
-}
-
-func (p *Plugin) Connections(topic string, res map[string]struct{}) {
- p.RLock()
- defer p.RUnlock()
-
- ret := p.storage.Get(topic)
- for rr := range ret {
- res[rr] = struct{}{}
- }
-}
-
-func (p *Plugin) Next() (*websocketsv1.Message, error) {
- msg := <-p.pushCh
- if msg == nil {
- return nil, nil
- }
-
- p.RLock()
- defer p.RUnlock()
-
- m := &websocketsv1.Message{}
- err := proto.Unmarshal(msg, m)
- if err != nil {
- return nil, err
- }
-
- // push only messages, which are subscribed
- // TODO better???
- for i := 0; i < len(m.GetTopics()); i++ {
- // if we have active subscribers - send a message to a topic
- // or send nil instead
- if ok := p.storage.Contains(m.GetTopics()[i]); ok {
- return m, nil
- }
- }
- return nil, nil
-}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 6ddd609c..c53491b4 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,6 +2,7 @@ package websockets
import (
"context"
+ "fmt"
"net/http"
"sync"
"time"
@@ -23,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
- "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"google.golang.org/protobuf/proto"
@@ -38,8 +38,11 @@ type Plugin struct {
// Collection with all available pubsubs
pubsubs map[string]pubsub.PubSub
- cfg *Config
- log logger.Logger
+ psProviders map[string]pubsub.PSProvider
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// global connections map
connections sync.Map
@@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
-
p.pubsubs = make(map[string]pubsub.PubSub)
+ p.psProviders = make(map[string]pubsub.PSProvider)
+
p.log = log
+ p.cfgPlugin = cfg
+
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
ReadBufferSize: 1024,
@@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
- // attach default driver
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
-
return nil
}
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
+ const op = errors.Op("websockets_plugin_serve")
+
+ err := p.initPubSubs()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
go func() {
var err error
@@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error {
return errCh
}
+func (p *Plugin) initPubSubs() error {
+ for i := 0; i < len(p.cfg.PubSubs); i++ {
+ // don't need to have a section for the in-memory
+ if p.cfg.PubSubs[i] == "memory" {
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ r, err := provider.PSProvide("")
+ if err != nil {
+ return err
+ }
+
+ // append default in-memory provider
+ p.pubsubs["memory"] = r
+ }
+ continue
+ }
+ // key - memory, redis
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ // try local key
+ switch {
+ // try local config first
+ case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+ r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+ r, err := provider.PSProvide(p.cfg.PubSubs[i])
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ default:
+ return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+ }
+ } else {
+ // no such driver
+ p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+ }
+ }
+
+ return nil
+}
+
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
@@ -167,8 +225,8 @@ func (p *Plugin) Name() string {
}
// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
- p.pubsubs[name.Name()] = pub
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
+ p.psProviders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid
}
}
-// go:inline
func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
const op = errors.Op("exec")
pd := payload.Payload{