diff options
author | Valery Piashchynski <[email protected]> | 2021-05-29 00:24:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-29 00:24:30 +0300 |
commit | fcda08498e8f914bbd0798da898818cd5d0e4348 (patch) | |
tree | 62d88384d07997e2373f3b273ba0cb83569ebced /plugins/websockets/plugin.go | |
parent | 8f13eb958c7eec49acba6e343edb77c6ede89f09 (diff) |
- Add new internal plugin - channel. Which used to deliver messages from
the ws plugin to the http directly
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 61 |
1 files changed, 41 insertions, 20 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 76ef800d..2a060716 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,12 +10,15 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/channel" "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/plugins/websockets/validator" ) const ( @@ -23,7 +26,7 @@ const ( ) type Plugin struct { - sync.RWMutex + mu sync.RWMutex // Collection with all available pubsubs pubsubs map[string]pubsub.PubSub @@ -34,10 +37,13 @@ type Plugin struct { connections sync.Map storage *storage.Storage + // GO workers pool workersPool *pool.WorkersPool + + hub channel.Hub } -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, channel channel.Hub) error { const op = errors.Op("websockets_plugin_init") if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) @@ -52,6 +58,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { p.log = log p.storage = storage.NewStorage() p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) + p.hub = channel return nil } @@ -69,10 +76,6 @@ func (p *Plugin) Serve() chan error { return } - if data == nil { - continue - } - p.workersPool.Queue(data) } }(v) @@ -115,6 +118,22 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { next.ServeHTTP(w, r) return } + p.mu.Lock() + + r = attributes.Init(r) + + err := validator.NewValidator().AssertServerAccess(p.hub, r) + if err != nil { + // show the error to the user + if av, ok := err.(*validator.AccessValidator); ok { + av.Copy(w) + } else { + w.WriteHeader(400) + return + } + } + + p.mu.Unlock() // connection upgrader upgraded := websocket.Upgrader{ @@ -154,13 +173,15 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.connections.Delete(connectionID) }() + p.mu.Lock() // Executor wraps a connection to have a safe abstraction - p.Lock() - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs) - p.Unlock() + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.hub, r) + p.mu.Unlock() p.log.Info("websocket client connected", "uuid", connectionID) + defer e.CleanUp() + err = e.StartCommandLoop() if err != nil { p.log.Error("command loop error", "error", err.Error()) @@ -170,32 +191,32 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []pubsub.Message) error { - p.Lock() - defer p.Unlock() +func (p *Plugin) Publish(msg []*pubsub.Message) error { + p.mu.Lock() + defer p.mu.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - if br, ok := p.pubsubs[msg[i].Broker()]; ok { + for j := 0; j < len(msg[i].Topics); j++ { + if br, ok := p.pubsubs[msg[i].Broker]; ok { err := br.Publish(msg) if err != nil { return errors.E(err) } } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker()) + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) } } } return nil } -func (p *Plugin) PublishAsync(msg []pubsub.Message) { +func (p *Plugin) PublishAsync(msg []*pubsub.Message) { go func() { - p.Lock() - defer p.Unlock() + p.mu.Lock() + defer p.mu.Unlock() for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics()); j++ { - err := p.pubsubs[msg[i].Broker()].Publish(msg) + for j := 0; j < len(msg[i].Topics); j++ { + err := p.pubsubs[msg[i].Broker].Publish(msg) if err != nil { p.log.Error("publish async error", "error", err) return |