summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pkg/interface/broadcast/broadcast.go7
-rw-r--r--pkg/pubsub/interface.go5
-rw-r--r--plugins/broadcast/config.go19
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/plugin.go119
-rw-r--r--plugins/broadcast/rpc.go (renamed from plugins/websockets/rpc.go)2
-rw-r--r--plugins/kv/interface.go2
-rw-r--r--plugins/websockets/config.go59
-rw-r--r--plugins/websockets/executor/executor.go8
-rw-r--r--plugins/websockets/plugin.go194
-rw-r--r--plugins/websockets/pool/workers_pool.go4
-rw-r--r--tests/plugins/broadcast/broadcast_plugin_test.go207
-rw-r--r--tests/plugins/broadcast/configs/.rr-broadcast-init.yaml43
13 files changed, 483 insertions, 187 deletions
diff --git a/pkg/interface/broadcast/broadcast.go b/pkg/interface/broadcast/broadcast.go
new file mode 100644
index 00000000..c922c82e
--- /dev/null
+++ b/pkg/interface/broadcast/broadcast.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) pubsub.SubReader
+}
diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go
index d021dbbe..30b544db 100644
--- a/pkg/pubsub/interface.go
+++ b/pkg/pubsub/interface.go
@@ -16,6 +16,11 @@ type PubSub interface {
Reader
}
+type SubReader interface {
+ Subscriber
+ Reader
+}
+
// Subscriber defines the ability to operate as message passing broker.
// BETA interface
type Subscriber interface {
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..18846f30
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,19 @@
+package broadcast
+
+/*
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ test:
+ driver: memory
+
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..b8d2947e
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..3b771746
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,119 @@
+package broadcast
+
+import (
+ "sync"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+const PluginName string = "broadcast"
+
+type Plugin struct {
+ sync.RWMutex
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.Publisher
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.publishers = make(map[string]pubsub.Publisher)
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
+ p.publishers[name.Name()] = subscriber
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m []byte) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err = p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+ }
+
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.publishers[msg.GetBroker()]; ok {
+ err := br.Publish(m)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ }
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) pubsub.SubReader {
+ println(key)
+ return nil
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/websockets/rpc.go b/plugins/broadcast/rpc.go
index 341e0b2a..fa853421 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -1,4 +1,4 @@
-package websockets
+package broadcast
import (
"github.com/spiral/errors"
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 5aedd5c3..fd906041 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -36,6 +36,6 @@ type StorageDriver interface {
// Provider provides storage based on the config
type Provider interface {
- // Provide provides Storage based on the config key
+ // KVProvide provides Storage based on the config key
KVProvide(key string) (Storage, error)
}
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index deb4406c..b1d5d0a8 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -8,55 +8,16 @@ import (
)
/*
-# GLOBAL
-redis:
- addrs:
- - 'localhost:6379'
-
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
-
- pubsubs:["redis", "amqp", "memory"]
- # OR local
- redis:
- addrs:
- - 'localhost:6379'
-
- # path used as websockets path
+ broker: default
+ allowed_origin: "*"
path: "/ws"
*/
-type RedisConfig struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
Path string `mapstructure:"path"`
- // ["redis", "amqp", "memory"]
- PubSubs []string `mapstructure:"pubsubs"`
- Middleware []string `mapstructure:"middleware"`
AllowedOrigin string `mapstructure:"allowed_origin"`
@@ -65,8 +26,8 @@ type Config struct {
allowedOrigins []string
allowedAll bool
- Redis *RedisConfig `mapstructure:"redis"`
- Pool *pool.Config `mapstructure:"pool"`
+ // Pool with the workers for the websockets
+ Pool *pool.Config `mapstructure:"pool"`
}
// InitDefault initialize default values for the ws config
@@ -75,11 +36,6 @@ func (c *Config) InitDefault() {
c.Path = "/ws"
}
- if len(c.PubSubs) == 0 {
- // memory used by default
- c.PubSubs = append(c.PubSubs, "memory")
- }
-
if c.Pool == nil {
c.Pool = &pool.Config{}
if c.Pool.NumWorkers == 0 {
@@ -99,13 +55,6 @@ func (c *Config) InitDefault() {
}
}
- if c.Redis != nil {
- if c.Redis.Addrs == nil {
- // append default
- c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
- }
- }
-
if c.AllowedOrigin == "" {
c.AllowedOrigin = "*"
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 5f904d26..07f22043 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -29,7 +29,7 @@ type Executor struct {
connID string
// map with the pubsub drivers
- pubsub map[string]pubsub.PubSub
+ pubsub map[string]pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,7 +38,7 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -170,7 +170,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
// associate connection with topics
err := br.Subscribe(e.connID, topics...)
if err != nil {
@@ -188,7 +188,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
// remove associated connections from the storage
err := br.Unsubscribe(e.connID, topics...)
if err != nil {
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..cf861c72 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -12,10 +11,10 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -26,7 +25,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -36,9 +34,11 @@ const (
type Plugin struct {
sync.RWMutex
// Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
+ //pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ //psProviders map[string]pubsub.PSProvider
+
+ subReaders map[string]pubsub.SubReader
cfg *Config
cfgPlugin config.Configurer
@@ -60,7 +60,7 @@ type Plugin struct {
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -72,8 +72,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
+ //p.pubsubs = make(map[string]pubsub.PubSub)
+ //p.psProviders = make(map[string]pubsub.PSProvider)
+
+ p.subReaders = make(map[string]pubsub.SubReader)
p.log = log
p.cfgPlugin = cfg
@@ -96,11 +98,11 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
- err := p.initPubSubs()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
+ //err := p.initPubSubs()
+ //if err != nil {
+ // errCh <- errors.E(op, err)
+ // return errCh
+ //}
go func() {
var err error
@@ -122,11 +124,11 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
+ for _, v := range p.subReaders {
+ go func(ps pubsub.SubReader) {
for {
select {
case <-p.serveExit:
@@ -146,53 +148,53 @@ func (p *Plugin) Serve() chan error {
return errCh
}
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
-
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
- }
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
- }
- }
-
- return nil
-}
+//func (p *Plugin) initPubSubs() error {
+// for i := 0; i < len(p.cfg.PubSubs); i++ {
+// // don't need to have a section for the in-memory
+// if p.cfg.PubSubs[i] == "memory" {
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// r, err := provider.PSProvide("")
+// if err != nil {
+// return err
+// }
+//
+// // append default in-memory provider
+// p.pubsubs["memory"] = r
+// }
+// continue
+// }
+// // key - memory, redis
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// // try local key
+// switch {
+// // try local config first
+// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+// r, err := provider.PSProvide(p.cfg.PubSubs[i])
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// default:
+// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+// }
+// } else {
+// // no such driver
+// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+// }
+// }
+//
+// return nil
+//}
func (p *Plugin) Stop() error {
// close workers pool
@@ -210,26 +212,19 @@ func (p *Plugin) Stop() error {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.GetPublishers,
+ p.GetSubsReader,
}
}
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
+// GetSubsReader collects all plugins which implement SubReader interface
+func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
+ p.subReaders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -277,7 +272,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +356,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index a196d1f0..22042d8d 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -12,7 +12,7 @@ import (
)
type WorkersPool struct {
- storage map[string]pubsub.PubSub
+ storage map[string]pubsub.SubReader
connections *sync.Map
resPool sync.Pool
log logger.Logger
@@ -22,7 +22,7 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *websocketsv1.Message, 100),
diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go
new file mode 100644
index 00000000..65ee4415
--- /dev/null
+++ b/tests/plugins/broadcast/broadcast_plugin_test.go
@@ -0,0 +1,207 @@
+package broadcast
+
+import (
+ "net"
+ "net/http"
+ "net/rpc"
+ "net/url"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "testing"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ endure "github.com/spiral/endure/pkg/container"
+ goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/memory"
+ "github.com/spiral/roadrunner/v2/plugins/redis"
+ rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
+ "github.com/spiral/roadrunner/v2/plugins/server"
+ "github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/spiral/roadrunner/v2/utils"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBroadcastInit(t *testing.T) {
+ cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel))
+ assert.NoError(t, err)
+
+ cfg := &config.Viper{
+ Path: "configs/.rr-broadcast-init.yaml",
+ Prefix: "rr",
+ }
+
+ err = cont.RegisterAll(
+ cfg,
+ &broadcast.Plugin{},
+ &rpcPlugin.Plugin{},
+ &logger.ZapLogger{},
+ &server.Plugin{},
+ &redis.Plugin{},
+ &websockets.Plugin{},
+ &httpPlugin.Plugin{},
+ &memory.Plugin{},
+ )
+
+ assert.NoError(t, err)
+
+ err = cont.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ ch, err := cont.Serve()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ sig := make(chan os.Signal, 1)
+ signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
+
+ wg := &sync.WaitGroup{}
+ wg.Add(1)
+
+ stopCh := make(chan struct{}, 1)
+
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case e := <-ch:
+ assert.Fail(t, "error", e.Error.Error())
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ case <-sig:
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ case <-stopCh:
+ // timeout
+ err = cont.Stop()
+ if err != nil {
+ assert.FailNow(t, "error", err.Error())
+ }
+ return
+ }
+ }
+ }()
+
+ time.Sleep(time.Second * 1)
+ //t.Run("TestWSInit", wsInit)
+
+ stopCh <- struct{}{}
+
+ wg.Wait()
+}
+
+func wsInit(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2"))
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
+}
+
+func publishAsync(t *testing.T, command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret)
+ assert.NoError(t, err)
+ assert.True(t, ret.Ok)
+}
+
+func publishAsync2(t *testing.T, command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ assert.NoError(t, err)
+ assert.True(t, ret.Ok)
+}
+
+func publish2(t *testing.T, command string, broker string, topics ...string) {
+ conn, err := net.Dial("tcp", "127.0.0.1:6001")
+ if err != nil {
+ panic(err)
+ }
+
+ client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
+
+ ret := &websocketsv1.Response{}
+ err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret)
+ assert.NoError(t, err)
+ assert.True(t, ret.Ok)
+}
+
+func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message {
+ return &websocketsv1.Message{
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ }
+}
+
+func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request {
+ m := &websocketsv1.Request{
+ Messages: []*websocketsv1.Message{
+ {
+ Topics: topics,
+ Command: command,
+ Broker: broker,
+ Payload: payload,
+ },
+ },
+ }
+
+ return m
+}
diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
new file mode 100644
index 00000000..fa4116d0
--- /dev/null
+++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml
@@ -0,0 +1,43 @@
+rpc:
+ listen: tcp://127.0.0.1:6001
+
+server:
+ command: "php ../../psr-worker-bench.php"
+ user: ""
+ group: ""
+ relay: "pipes"
+ relay_timeout: "20s"
+
+http:
+ address: 127.0.0.1:11111
+ max_request_size: 1024
+ middleware: [ "websockets" ]
+ trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
+ pool:
+ num_workers: 2
+ max_jobs: 0
+ allocate_timeout: 60s
+ destroy_timeout: 60s
+
+redis:
+ addrs:
+ - "localhost:6379"
+
+broadcast:
+ default:
+ driver: redis
+ test:
+ driver: memory
+
+websockets:
+ pubsubs: [ "redis" ]
+ path: "/ws"
+
+logs:
+ mode: development
+ level: error
+
+endure:
+ grace_period: 120s
+ print_graph: false
+ log_level: error