summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
committerValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
commit68ff941c4226074206ceed9c30bd95317aa0e9fc (patch)
tree693306256281cccefb29f4eedb7f617a9022154e /plugins/websockets
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/config.go59
-rw-r--r--plugins/websockets/executor/executor.go8
-rw-r--r--plugins/websockets/plugin.go194
-rw-r--r--plugins/websockets/pool/workers_pool.go4
-rw-r--r--plugins/websockets/rpc.go75
5 files changed, 80 insertions, 260 deletions
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index deb4406c..b1d5d0a8 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -8,55 +8,16 @@ import (
)
/*
-# GLOBAL
-redis:
- addrs:
- - 'localhost:6379'
-
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
-
- pubsubs:["redis", "amqp", "memory"]
- # OR local
- redis:
- addrs:
- - 'localhost:6379'
-
- # path used as websockets path
+ broker: default
+ allowed_origin: "*"
path: "/ws"
*/
-type RedisConfig struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
Path string `mapstructure:"path"`
- // ["redis", "amqp", "memory"]
- PubSubs []string `mapstructure:"pubsubs"`
- Middleware []string `mapstructure:"middleware"`
AllowedOrigin string `mapstructure:"allowed_origin"`
@@ -65,8 +26,8 @@ type Config struct {
allowedOrigins []string
allowedAll bool
- Redis *RedisConfig `mapstructure:"redis"`
- Pool *pool.Config `mapstructure:"pool"`
+ // Pool with the workers for the websockets
+ Pool *pool.Config `mapstructure:"pool"`
}
// InitDefault initialize default values for the ws config
@@ -75,11 +36,6 @@ func (c *Config) InitDefault() {
c.Path = "/ws"
}
- if len(c.PubSubs) == 0 {
- // memory used by default
- c.PubSubs = append(c.PubSubs, "memory")
- }
-
if c.Pool == nil {
c.Pool = &pool.Config{}
if c.Pool.NumWorkers == 0 {
@@ -99,13 +55,6 @@ func (c *Config) InitDefault() {
}
}
- if c.Redis != nil {
- if c.Redis.Addrs == nil {
- // append default
- c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
- }
- }
-
if c.AllowedOrigin == "" {
c.AllowedOrigin = "*"
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 5f904d26..07f22043 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -29,7 +29,7 @@ type Executor struct {
connID string
// map with the pubsub drivers
- pubsub map[string]pubsub.PubSub
+ pubsub map[string]pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,7 +38,7 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -170,7 +170,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
// associate connection with topics
err := br.Subscribe(e.connID, topics...)
if err != nil {
@@ -188,7 +188,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
// remove associated connections from the storage
err := br.Unsubscribe(e.connID, topics...)
if err != nil {
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..cf861c72 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -12,10 +11,10 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -26,7 +25,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -36,9 +34,11 @@ const (
type Plugin struct {
sync.RWMutex
// Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
+ //pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ //psProviders map[string]pubsub.PSProvider
+
+ subReaders map[string]pubsub.SubReader
cfg *Config
cfgPlugin config.Configurer
@@ -60,7 +60,7 @@ type Plugin struct {
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -72,8 +72,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
+ //p.pubsubs = make(map[string]pubsub.PubSub)
+ //p.psProviders = make(map[string]pubsub.PSProvider)
+
+ p.subReaders = make(map[string]pubsub.SubReader)
p.log = log
p.cfgPlugin = cfg
@@ -96,11 +98,11 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
- err := p.initPubSubs()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
+ //err := p.initPubSubs()
+ //if err != nil {
+ // errCh <- errors.E(op, err)
+ // return errCh
+ //}
go func() {
var err error
@@ -122,11 +124,11 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
+ for _, v := range p.subReaders {
+ go func(ps pubsub.SubReader) {
for {
select {
case <-p.serveExit:
@@ -146,53 +148,53 @@ func (p *Plugin) Serve() chan error {
return errCh
}
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
-
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
- }
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
- }
- }
-
- return nil
-}
+//func (p *Plugin) initPubSubs() error {
+// for i := 0; i < len(p.cfg.PubSubs); i++ {
+// // don't need to have a section for the in-memory
+// if p.cfg.PubSubs[i] == "memory" {
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// r, err := provider.PSProvide("")
+// if err != nil {
+// return err
+// }
+//
+// // append default in-memory provider
+// p.pubsubs["memory"] = r
+// }
+// continue
+// }
+// // key - memory, redis
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// // try local key
+// switch {
+// // try local config first
+// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+// r, err := provider.PSProvide(p.cfg.PubSubs[i])
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// default:
+// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+// }
+// } else {
+// // no such driver
+// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+// }
+// }
+//
+// return nil
+//}
func (p *Plugin) Stop() error {
// close workers pool
@@ -210,26 +212,19 @@ func (p *Plugin) Stop() error {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.GetPublishers,
+ p.GetSubsReader,
}
}
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
+// GetSubsReader collects all plugins which implement SubReader interface
+func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
+ p.subReaders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -277,7 +272,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +356,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index a196d1f0..22042d8d 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -12,7 +12,7 @@ import (
)
type WorkersPool struct {
- storage map[string]pubsub.PubSub
+ storage map[string]pubsub.SubReader
connections *sync.Map
resPool sync.Pool
log logger.Logger
@@ -22,7 +22,7 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *websocketsv1.Message, 100),
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
deleted file mode 100644
index 341e0b2a..00000000
--- a/plugins/websockets/rpc.go
+++ /dev/null
@@ -1,75 +0,0 @@
-package websockets
-
-import (
- "github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "google.golang.org/protobuf/proto"
-)
-
-// rpc collectors struct
-type rpc struct {
- plugin *Plugin
- log logger.Logger
-}
-
-// Publish ... msg is a proto decoded payload
-// see: pkg/pubsub/message.fbs
-func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("broadcast_publish")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- return errors.E(op, err)
- }
-
- err = r.plugin.Publish(bb)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
- }
-
- out.Ok = true
- return nil
-}
-
-// PublishAsync ...
-// see: pkg/pubsub/message.fbs
-func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("publish_async")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.Messages)
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
-
- r.plugin.PublishAsync(bb)
- }
-
- out.Ok = true
- return nil
-}