summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
committerValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
commit68ff941c4226074206ceed9c30bd95317aa0e9fc (patch)
tree693306256281cccefb29f4eedb7f617a9022154e /plugins
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/config.go19
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/plugin.go119
-rw-r--r--plugins/broadcast/rpc.go (renamed from plugins/websockets/rpc.go)2
-rw-r--r--plugins/kv/interface.go2
-rw-r--r--plugins/websockets/config.go59
-rw-r--r--plugins/websockets/executor/executor.go8
-rw-r--r--plugins/websockets/plugin.go194
-rw-r--r--plugins/websockets/pool/workers_pool.go4
9 files changed, 221 insertions, 187 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..18846f30
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,19 @@
+package broadcast
+
+/*
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ test:
+ driver: memory
+
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..b8d2947e
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..3b771746
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,119 @@
+package broadcast
+
+import (
+ "sync"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+const PluginName string = "broadcast"
+
+type Plugin struct {
+ sync.RWMutex
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.Publisher
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.publishers = make(map[string]pubsub.Publisher)
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
+ p.publishers[name.Name()] = subscriber
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m []byte) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err = p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+ }
+
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.publishers[msg.GetBroker()]; ok {
+ err := br.Publish(m)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ }
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) pubsub.SubReader {
+ println(key)
+ return nil
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/websockets/rpc.go b/plugins/broadcast/rpc.go
index 341e0b2a..fa853421 100644
--- a/plugins/websockets/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -1,4 +1,4 @@
-package websockets
+package broadcast
import (
"github.com/spiral/errors"
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index 5aedd5c3..fd906041 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -36,6 +36,6 @@ type StorageDriver interface {
// Provider provides storage based on the config
type Provider interface {
- // Provide provides Storage based on the config key
+ // KVProvide provides Storage based on the config key
KVProvide(key string) (Storage, error)
}
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index deb4406c..b1d5d0a8 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -8,55 +8,16 @@ import (
)
/*
-# GLOBAL
-redis:
- addrs:
- - 'localhost:6379'
-
websockets:
- # pubsubs should implement PubSub interface to be collected via endure.Collects
-
- pubsubs:["redis", "amqp", "memory"]
- # OR local
- redis:
- addrs:
- - 'localhost:6379'
-
- # path used as websockets path
+ broker: default
+ allowed_origin: "*"
path: "/ws"
*/
-type RedisConfig struct {
- Addrs []string `mapstructure:"addrs"`
- DB int `mapstructure:"db"`
- Username string `mapstructure:"username"`
- Password string `mapstructure:"password"`
- MasterName string `mapstructure:"master_name"`
- SentinelPassword string `mapstructure:"sentinel_password"`
- RouteByLatency bool `mapstructure:"route_by_latency"`
- RouteRandomly bool `mapstructure:"route_randomly"`
- MaxRetries int `mapstructure:"max_retries"`
- DialTimeout time.Duration `mapstructure:"dial_timeout"`
- MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
- MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
- PoolSize int `mapstructure:"pool_size"`
- MinIdleConns int `mapstructure:"min_idle_conns"`
- MaxConnAge time.Duration `mapstructure:"max_conn_age"`
- ReadTimeout time.Duration `mapstructure:"read_timeout"`
- WriteTimeout time.Duration `mapstructure:"write_timeout"`
- PoolTimeout time.Duration `mapstructure:"pool_timeout"`
- IdleTimeout time.Duration `mapstructure:"idle_timeout"`
- IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
- ReadOnly bool `mapstructure:"read_only"`
-}
-
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
Path string `mapstructure:"path"`
- // ["redis", "amqp", "memory"]
- PubSubs []string `mapstructure:"pubsubs"`
- Middleware []string `mapstructure:"middleware"`
AllowedOrigin string `mapstructure:"allowed_origin"`
@@ -65,8 +26,8 @@ type Config struct {
allowedOrigins []string
allowedAll bool
- Redis *RedisConfig `mapstructure:"redis"`
- Pool *pool.Config `mapstructure:"pool"`
+ // Pool with the workers for the websockets
+ Pool *pool.Config `mapstructure:"pool"`
}
// InitDefault initialize default values for the ws config
@@ -75,11 +36,6 @@ func (c *Config) InitDefault() {
c.Path = "/ws"
}
- if len(c.PubSubs) == 0 {
- // memory used by default
- c.PubSubs = append(c.PubSubs, "memory")
- }
-
if c.Pool == nil {
c.Pool = &pool.Config{}
if c.Pool.NumWorkers == 0 {
@@ -99,13 +55,6 @@ func (c *Config) InitDefault() {
}
}
- if c.Redis != nil {
- if c.Redis.Addrs == nil {
- // append default
- c.Redis.Addrs = append(c.Redis.Addrs, "localhost:6379")
- }
- }
-
if c.AllowedOrigin == "" {
c.AllowedOrigin = "*"
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 5f904d26..07f22043 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -29,7 +29,7 @@ type Executor struct {
connID string
// map with the pubsub drivers
- pubsub map[string]pubsub.PubSub
+ pubsub map[string]pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,7 +38,7 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -170,7 +170,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
// associate connection with topics
err := br.Subscribe(e.connID, topics...)
if err != nil {
@@ -188,7 +188,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
+func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
// remove associated connections from the storage
err := br.Unsubscribe(e.connID, topics...)
if err != nil {
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..cf861c72 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -12,10 +11,10 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -26,7 +25,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -36,9 +34,11 @@ const (
type Plugin struct {
sync.RWMutex
// Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
+ //pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ //psProviders map[string]pubsub.PSProvider
+
+ subReaders map[string]pubsub.SubReader
cfg *Config
cfgPlugin config.Configurer
@@ -60,7 +60,7 @@ type Plugin struct {
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -72,8 +72,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
+ //p.pubsubs = make(map[string]pubsub.PubSub)
+ //p.psProviders = make(map[string]pubsub.PSProvider)
+
+ p.subReaders = make(map[string]pubsub.SubReader)
p.log = log
p.cfgPlugin = cfg
@@ -96,11 +98,11 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
- err := p.initPubSubs()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
+ //err := p.initPubSubs()
+ //if err != nil {
+ // errCh <- errors.E(op, err)
+ // return errCh
+ //}
go func() {
var err error
@@ -122,11 +124,11 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
+ for _, v := range p.subReaders {
+ go func(ps pubsub.SubReader) {
for {
select {
case <-p.serveExit:
@@ -146,53 +148,53 @@ func (p *Plugin) Serve() chan error {
return errCh
}
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
-
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
- }
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
- }
- }
-
- return nil
-}
+//func (p *Plugin) initPubSubs() error {
+// for i := 0; i < len(p.cfg.PubSubs); i++ {
+// // don't need to have a section for the in-memory
+// if p.cfg.PubSubs[i] == "memory" {
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// r, err := provider.PSProvide("")
+// if err != nil {
+// return err
+// }
+//
+// // append default in-memory provider
+// p.pubsubs["memory"] = r
+// }
+// continue
+// }
+// // key - memory, redis
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// // try local key
+// switch {
+// // try local config first
+// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+// r, err := provider.PSProvide(p.cfg.PubSubs[i])
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// default:
+// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+// }
+// } else {
+// // no such driver
+// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+// }
+// }
+//
+// return nil
+//}
func (p *Plugin) Stop() error {
// close workers pool
@@ -210,26 +212,19 @@ func (p *Plugin) Stop() error {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.GetPublishers,
+ p.GetSubsReader,
}
}
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
+// GetSubsReader collects all plugins which implement SubReader interface
+func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
+ p.subReaders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -277,7 +272,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +356,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index a196d1f0..22042d8d 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -12,7 +12,7 @@ import (
)
type WorkersPool struct {
- storage map[string]pubsub.PubSub
+ storage map[string]pubsub.SubReader
connections *sync.Map
resPool sync.Pool
log logger.Logger
@@ -22,7 +22,7 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *websocketsv1.Message, 100),