summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-29 00:24:30 +0300
committerValery Piashchynski <[email protected]>2021-05-29 00:24:30 +0300
commitfcda08498e8f914bbd0798da898818cd5d0e4348 (patch)
tree62d88384d07997e2373f3b273ba0cb83569ebced /plugins/websockets/plugin.go
parent8f13eb958c7eec49acba6e343edb77c6ede89f09 (diff)
- Add new internal plugin - channel. Which used to deliver messages from
the ws plugin to the http directly Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go61
1 files changed, 41 insertions, 20 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 76ef800d..2a060716 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,12 +10,15 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/channel"
"github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/validator"
)
const (
@@ -23,7 +26,7 @@ const (
)
type Plugin struct {
- sync.RWMutex
+ mu sync.RWMutex
// Collection with all available pubsubs
pubsubs map[string]pubsub.PubSub
@@ -34,10 +37,13 @@ type Plugin struct {
connections sync.Map
storage *storage.Storage
+ // GO workers pool
workersPool *pool.WorkersPool
+
+ hub channel.Hub
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, channel channel.Hub) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -52,6 +58,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
p.log = log
p.storage = storage.NewStorage()
p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+ p.hub = channel
return nil
}
@@ -69,10 +76,6 @@ func (p *Plugin) Serve() chan error {
return
}
- if data == nil {
- continue
- }
-
p.workersPool.Queue(data)
}
}(v)
@@ -115,6 +118,22 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
return
}
+ p.mu.Lock()
+
+ r = attributes.Init(r)
+
+ err := validator.NewValidator().AssertServerAccess(p.hub, r)
+ if err != nil {
+ // show the error to the user
+ if av, ok := err.(*validator.AccessValidator); ok {
+ av.Copy(w)
+ } else {
+ w.WriteHeader(400)
+ return
+ }
+ }
+
+ p.mu.Unlock()
// connection upgrader
upgraded := websocket.Upgrader{
@@ -154,13 +173,15 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Delete(connectionID)
}()
+ p.mu.Lock()
// Executor wraps a connection to have a safe abstraction
- p.Lock()
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs)
- p.Unlock()
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.hub, r)
+ p.mu.Unlock()
p.log.Info("websocket client connected", "uuid", connectionID)
+ defer e.CleanUp()
+
err = e.StartCommandLoop()
if err != nil {
p.log.Error("command loop error", "error", err.Error())
@@ -170,32 +191,32 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []pubsub.Message) error {
- p.Lock()
- defer p.Unlock()
+func (p *Plugin) Publish(msg []*pubsub.Message) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- if br, ok := p.pubsubs[msg[i].Broker()]; ok {
+ for j := 0; j < len(msg[i].Topics); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker]; ok {
err := br.Publish(msg)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
}
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
go func() {
- p.Lock()
- defer p.Unlock()
+ p.mu.Lock()
+ defer p.mu.Unlock()
for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics()); j++ {
- err := p.pubsubs[msg[i].Broker()].Publish(msg)
+ for j := 0; j < len(msg[i].Topics); j++ {
+ err := p.pubsubs[msg[i].Broker].Publish(msg)
if err != nil {
p.log.Error("publish async error", "error", err)
return