diff options
-rw-r--r-- | pkg/interface/broadcast/broadcast.go | 7 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 5 | ||||
-rw-r--r-- | plugins/broadcast/config.go | 19 | ||||
-rw-r--r-- | plugins/broadcast/doc/broadcast_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 119 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go (renamed from plugins/websockets/rpc.go) | 2 | ||||
-rw-r--r-- | plugins/kv/interface.go | 2 | ||||
-rw-r--r-- | plugins/websockets/config.go | 59 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 8 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 194 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 4 | ||||
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 207 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-init.yaml | 43 |
13 files changed, 483 insertions, 187 deletions
diff --git a/pkg/interface/broadcast/broadcast.go b/pkg/interface/broadcast/broadcast.go new file mode 100644 index 00000000..c922c82e --- /dev/null +++ b/pkg/interface/broadcast/broadcast.go @@ -0,0 +1,7 @@ +package broadcast + +import "github.com/spiral/roadrunner/v2/pkg/pubsub" + +type Broadcaster interface { + GetDriver(key string) pubsub.SubReader +} diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index d021dbbe..30b544db 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -16,6 +16,11 @@ type PubSub interface { Reader } +type SubReader interface { + Subscriber + Reader +} + // Subscriber defines the ability to operate as message passing broker. // BETA interface type Subscriber interface { diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go new file mode 100644 index 00000000..18846f30 --- /dev/null +++ b/plugins/broadcast/config.go @@ -0,0 +1,19 @@ +package broadcast + +/* +websockets: # <----- one of possible subscribers + path: /ws + broker: default # <------ broadcast broker to use --------------- | + | match +broadcast: # <-------- broadcast entry point plugin | + default: # <----------------------------------------------------- | + driver: redis + test: + driver: memory + +*/ + +// Config ... +type Config struct { + Data map[string]interface{} `mapstructure:"broadcast"` +} diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio new file mode 100644 index 00000000..b8d2947e --- /dev/null +++ b/plugins/broadcast/doc/broadcast_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go new file mode 100644 index 00000000..3b771746 --- /dev/null +++ b/plugins/broadcast/plugin.go @@ -0,0 +1,119 @@ +package broadcast + +import ( + "sync" + + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "google.golang.org/protobuf/proto" +) + +const PluginName string = "broadcast" + +type Plugin struct { + sync.RWMutex + log logger.Logger + // publishers implement Publisher interface + // and able to receive a payload + publishers map[string]pubsub.Publisher +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("broadcast_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + p.publishers = make(map[string]pubsub.Publisher) + p.log = log + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.CollectPublishers, + } +} + +// CollectPublishers collect all plugins who implement pubsub.Publisher interface +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) { + p.publishers[name.Name()] = subscriber +} + +// Publish is an entry point to the websocket PUBSUB +func (p *Plugin) Publish(m []byte) error { + p.Lock() + defer p.Unlock() + + const op = errors.Op("broadcast_plugin_publish") + + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + return errors.E(op, err) + } + + // Get payload + for i := 0; i < len(msg.GetTopics()); i++ { + if len(p.publishers) > 0 { + for j := range p.publishers { + err = p.publishers[j].Publish(m) + if err != nil { + return errors.E(op, err) + } + } + + return nil + } + + p.log.Warn("no publishers registered") + } + + return nil +} + +func (p *Plugin) PublishAsync(m []byte) { + go func() { + p.Lock() + defer p.Unlock() + msg := &websocketsv1.Message{} + err := proto.Unmarshal(m, msg) + if err != nil { + p.log.Error("message unmarshal") + } + + // Get payload + for i := 0; i < len(msg.GetTopics()); i++ { + if br, ok := p.publishers[msg.GetBroker()]; ok { + err := br.Publish(m) + if err != nil { + p.log.Error("publish async error", "error", err) + } + } else { + p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker()) + } + } + }() +} + +func (p *Plugin) GetDriver(key string) pubsub.SubReader { + println(key) + return nil +} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + plugin: p, + log: p.log, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +func (p *Plugin) Available() {} diff --git a/plugins/websockets/rpc.go b/plugins/broadcast/rpc.go index 341e0b2a..fa853421 100644 --- a/plugins/websockets/rpc.go +++ b/plugins/broadcast/rpc.go @@ -1,4 +1,4 @@ -package websockets +package broadcast import ( "github.com/spiral/errors" diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index 5aedd5c3..fd906041 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -36,6 +36,6 @@ type StorageDriver interface { // Provider provides storage based on the config type Provider interface { - // Provide provides Storage based on the config key + // KVProvide provides Storage based on the config key KVProvide(key string) (Storage, error) } diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go index deb4406c..b1d5d0a8 100644 --- a/plugins/websockets/config.go +++ b/plugins/websockets/config.go @@ -8,55 +8,16 @@ import ( ) /* -# GLOBAL -redis: - addrs: - - 'localhost:6379' - websockets: - # pubsubs should implement PubSub interface to be collected via endure.Collects - - pubsubs:["redis", "amqp", "memory"] - # OR local - redis: - addrs: - - 'localhost:6379' - - # path used as websockets path + broker: default + allowed_origin: "*" path: "/ws" */ -type RedisConfig struct { - Addrs []string `mapstructure:"addrs"` - DB int `mapstructure:"db"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - MasterName string `mapstructure:"master_name"` - SentinelPassword string `mapstructure:"sentinel_password"` - RouteByLatency bool `mapstructure:"route_by_latency"` - RouteRandomly bool `mapstructure:"route_randomly"` - MaxRetries int `mapstructure:"max_retries"` - DialTimeout time.Duration `mapstructure:"dial_timeout"` - MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` - MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` - PoolSize int `mapstructure:"pool_size"` - MinIdleConns int `mapstructure:"min_idle_conns"` - MaxConnAge time.Duration `mapstructure:"max_conn_age"` - ReadTimeout time.Duration `mapstructure:"read_timeout"` - WriteTimeout time.Duration `mapstructure:"write_timeout"` - PoolTimeout time.Duration `mapstructure:"pool_timeout"` - IdleTimeout time.Duration `mapstructure:"idle_timeout"` - IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` - ReadOnly bool `mapstructure:"read_only"` -} - // Config represents configuration for the ws plugin type Config struct { // http path for the websocket Path string `mapstructure:"path"` - // ["redis", "amqp", "memory"] - PubSubs []string `mapstructure:"pubsubs"` - Middleware []string `mapstructure:"middleware"` AllowedOrigin string `mapstructure:"allowed_origin"` @@ -65,8 +26,8 @@ type Config struct { allowedOrigins []string allowedAll bool - Redis *RedisConfig `mapstructure:"redis"` - Pool *pool.Config `mapstructure:"pool"` + // Pool with the workers for the websockets + Pool *pool.Config `mapstructure:"pool"` } // InitDefault initialize default values for the ws config @@ -75,11 +36,6 @@ func (c *Config) InitDefault() { c.Path = "/ws" } - if len(c.PubSubs) == 0 { - // memory used by default - c.PubSubs = append(c.PubSubs, "memory") - } - if c.Pool == nil { c.Pool = &pool.Config{} if c.Pool.NumWorkers == 0 { @@ -99,13 +55,6 @@ func (c *Config) InitDefault() { } } - if c.Redis != nil { - if c.Redis.Addrs == nil { - // append default - c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379") - } - } - if c.AllowedOrigin == "" { c.AllowedOrigin = "*" } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 5f904d26..07f22043 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -29,7 +29,7 @@ type Executor struct { connID string // map with the pubsub drivers - pubsub map[string]pubsub.PubSub + pubsub map[string]pubsub.Subscriber actualTopics map[string]struct{} req *http.Request @@ -38,7 +38,7 @@ type Executor struct { // NewExecutor creates protected connection and starts command loop func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { + connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, @@ -170,7 +170,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit } } -func (e *Executor) Set(br pubsub.PubSub, topics []string) error { +func (e *Executor) Set(br pubsub.Subscriber, topics []string) error { // associate connection with topics err := br.Subscribe(e.connID, topics...) if err != nil { @@ -188,7 +188,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { return nil } -func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { +func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error { // remove associated connections from the storage err := br.Unsubscribe(e.connID, topics...) if err != nil { diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 8b708187..cf861c72 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -2,7 +2,6 @@ package websockets import ( "context" - "fmt" "net/http" "sync" "time" @@ -12,10 +11,10 @@ import ( json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/broadcast" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" @@ -26,7 +25,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - "google.golang.org/protobuf/proto" ) const ( @@ -36,9 +34,11 @@ const ( type Plugin struct { sync.RWMutex // Collection with all available pubsubs - pubsubs map[string]pubsub.PubSub + //pubsubs map[string]pubsub.PubSub - psProviders map[string]pubsub.PSProvider + //psProviders map[string]pubsub.PSProvider + + subReaders map[string]pubsub.SubReader cfg *Config cfgPlugin config.Configurer @@ -60,7 +60,7 @@ type Plugin struct { accessValidator validator.AccessValidatorFn } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error { const op = errors.Op("websockets_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -72,8 +72,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.cfg.InitDefault() - p.pubsubs = make(map[string]pubsub.PubSub) - p.psProviders = make(map[string]pubsub.PSProvider) + //p.pubsubs = make(map[string]pubsub.PubSub) + //p.psProviders = make(map[string]pubsub.PSProvider) + + p.subReaders = make(map[string]pubsub.SubReader) p.log = log p.cfgPlugin = cfg @@ -96,11 +98,11 @@ func (p *Plugin) Serve() chan error { errCh := make(chan error, 1) const op = errors.Op("websockets_plugin_serve") - err := p.initPubSubs() - if err != nil { - errCh <- errors.E(op, err) - return errCh - } + //err := p.initPubSubs() + //if err != nil { + // errCh <- errors.E(op, err) + // return errCh + //} go func() { var err error @@ -122,11 +124,11 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() - p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log) + p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log) // run all pubsubs drivers - for _, v := range p.pubsubs { - go func(ps pubsub.PubSub) { + for _, v := range p.subReaders { + go func(ps pubsub.SubReader) { for { select { case <-p.serveExit: @@ -146,53 +148,53 @@ func (p *Plugin) Serve() chan error { return errCh } -func (p *Plugin) initPubSubs() error { - for i := 0; i < len(p.cfg.PubSubs); i++ { - // don't need to have a section for the in-memory - if p.cfg.PubSubs[i] == "memory" { - if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { - r, err := provider.PSProvide("") - if err != nil { - return err - } - - // append default in-memory provider - p.pubsubs["memory"] = r - } - continue - } - // key - memory, redis - if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { - // try local key - switch { - // try local config first - case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): - r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) - if err != nil { - return err - } - - // append redis provider - p.pubsubs[p.cfg.PubSubs[i]] = r - case p.cfgPlugin.Has(p.cfg.PubSubs[i]): - r, err := provider.PSProvide(p.cfg.PubSubs[i]) - if err != nil { - return err - } - - // append redis provider - p.pubsubs[p.cfg.PubSubs[i]] = r - default: - return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) - } - } else { - // no such driver - p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) - } - } - - return nil -} +//func (p *Plugin) initPubSubs() error { +// for i := 0; i < len(p.cfg.PubSubs); i++ { +// // don't need to have a section for the in-memory +// if p.cfg.PubSubs[i] == "memory" { +// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { +// r, err := provider.PSProvide("") +// if err != nil { +// return err +// } +// +// // append default in-memory provider +// p.pubsubs["memory"] = r +// } +// continue +// } +// // key - memory, redis +// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { +// // try local key +// switch { +// // try local config first +// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): +// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) +// if err != nil { +// return err +// } +// +// // append redis provider +// p.pubsubs[p.cfg.PubSubs[i]] = r +// case p.cfgPlugin.Has(p.cfg.PubSubs[i]): +// r, err := provider.PSProvide(p.cfg.PubSubs[i]) +// if err != nil { +// return err +// } +// +// // append redis provider +// p.pubsubs[p.cfg.PubSubs[i]] = r +// default: +// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) +// } +// } else { +// // no such driver +// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) +// } +// } +// +// return nil +//} func (p *Plugin) Stop() error { // close workers pool @@ -210,26 +212,19 @@ func (p *Plugin) Stop() error { func (p *Plugin) Collects() []interface{} { return []interface{}{ - p.GetPublishers, + p.GetSubsReader, } } func (p *Plugin) Available() {} -func (p *Plugin) RPC() interface{} { - return &rpc{ - plugin: p, - log: p.log, - } -} - func (p *Plugin) Name() string { return PluginName } -// GetPublishers collects all pubsubs -func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) { - p.psProviders[name.Name()] = pub +// GetSubsReader collects all plugins which implement SubReader interface +func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) { + p.subReaders[name.Name()] = pub } func (p *Plugin) Middleware(next http.Handler) http.Handler { @@ -277,7 +272,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.connections.Store(connectionID, safeConn) // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) err = e.StartCommandLoop() @@ -361,55 +356,6 @@ func (p *Plugin) Reset() error { return nil } -// Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(m []byte) error { - p.Lock() - defer p.Unlock() - - msg := &websocketsv1.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - return err - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.pubsubs[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) - } - } - return nil -} - -func (p *Plugin) PublishAsync(m []byte) { - go func() { - p.Lock() - defer p.Unlock() - msg := &websocketsv1.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - p.log.Error("message unmarshal") - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.pubsubs[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - p.log.Error("publish async error", "error", err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker()) - } - } - }() -} - func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { const op = errors.Op("access_validator") diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index a196d1f0..22042d8d 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -12,7 +12,7 @@ import ( ) type WorkersPool struct { - storage map[string]pubsub.PubSub + storage map[string]pubsub.SubReader connections *sync.Map resPool sync.Pool log logger.Logger @@ -22,7 +22,7 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *websocketsv1.Message, 100), diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go new file mode 100644 index 00000000..65ee4415 --- /dev/null +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -0,0 +1,207 @@ +package broadcast + +import ( + "net" + "net/http" + "net/rpc" + "net/url" + "os" + "os/signal" + "sync" + "syscall" + "testing" + "time" + + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" + endure "github.com/spiral/endure/pkg/container" + goridgeRpc "github.com/spiral/goridge/v3/pkg/rpc" + websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/plugins/broadcast" + "github.com/spiral/roadrunner/v2/plugins/config" + httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/memory" + "github.com/spiral/roadrunner/v2/plugins/redis" + rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" + "github.com/spiral/roadrunner/v2/plugins/server" + "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/spiral/roadrunner/v2/utils" + "github.com/stretchr/testify/assert" +) + +func TestBroadcastInit(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-init.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + ch, err := cont.Serve() + if err != nil { + t.Fatal(err) + } + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + + wg := &sync.WaitGroup{} + wg.Add(1) + + stopCh := make(chan struct{}, 1) + + go func() { + defer wg.Done() + for { + select { + case e := <-ch: + assert.Fail(t, "error", e.Error.Error()) + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + case <-sig: + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + case <-stopCh: + // timeout + err = cont.Stop() + if err != nil { + assert.FailNow(t, "error", err.Error()) + } + return + } + } + }() + + time.Sleep(time.Second * 1) + //t.Run("TestWSInit", wsInit) + + stopCh <- struct{}{} + + wg.Wait() +} + +func wsInit(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + d, err := json.Marshal(messageWS("join", "memory", []byte("hello websockets"), "foo", "foo2")) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) +} + +func publishAsync(t *testing.T, command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP"), topics...), ret) + assert.NoError(t, err) + assert.True(t, ret.Ok) +} + +func publishAsync2(t *testing.T, command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("websockets.PublishAsync", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + assert.NoError(t, err) + assert.True(t, ret.Ok) +} + +func publish2(t *testing.T, command string, broker string, topics ...string) { + conn, err := net.Dial("tcp", "127.0.0.1:6001") + if err != nil { + panic(err) + } + + client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) + + ret := &websocketsv1.Response{} + err = client.Call("websockets.Publish", makeMessage(command, broker, []byte("hello, PHP2"), topics...), ret) + assert.NoError(t, err) + assert.True(t, ret.Ok) +} + +func messageWS(command string, broker string, payload []byte, topics ...string) *websocketsv1.Message { + return &websocketsv1.Message{ + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + } +} + +func makeMessage(command string, broker string, payload []byte, topics ...string) *websocketsv1.Request { + m := &websocketsv1.Request{ + Messages: []*websocketsv1.Message{ + { + Topics: topics, + Command: command, + Broker: broker, + Payload: payload, + }, + }, + } + + return m +} diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml new file mode 100644 index 00000000..fa4116d0 --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -0,0 +1,43 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:11111 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +redis: + addrs: + - "localhost:6379" + +broadcast: + default: + driver: redis + test: + driver: memory + +websockets: + pubsubs: [ "redis" ] + path: "/ws" + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error |