summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go48
1 files changed, 29 insertions, 19 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index fe55d30e..4c0edcad 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -24,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -43,7 +42,6 @@ type Plugin struct {
// global connections map
connections sync.Map
- storage *storage.Storage
// GO workers pool
workersPool *pool.WorkersPool
@@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.pubsubs = make(map[string]pubsub.PubSub)
p.log = log
- p.storage = storage.NewStorage()
- p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
}
p.serveExit = make(chan struct{})
p.server = server
@@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
+ p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+
// run all pubsubs drivers
for _, v := range p.pubsubs {
go func(ps pubsub.PubSub) {
@@ -133,8 +133,13 @@ func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
p.Lock()
+ if p.phpPool == nil {
+ p.Unlock()
+ return nil
+ }
p.phpPool.Destroy(context.Background())
p.Unlock()
+
return nil
}
@@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// store connection
p.connections.Store(connectionID, safeConn)
- defer func() {
- // close the connection on exit
- err = safeConn.Close()
- if err != nil {
- p.log.Error("connection close", "error", err)
- }
-
- // when exiting - delete the connection
- p.connections.Delete(connectionID)
- }()
-
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
- defer e.CleanUp()
err = e.StartCommandLoop()
if err != nil {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
+
+ // when exiting - delete the connection
+ p.connections.Delete(connectionID)
+
+ // remove connection from all topics from all pub-sub drivers
+ e.CleanUp()
+
+ err = r.Body.Close()
+ if err != nil {
+ p.log.Error("body close", "error", err)
+ }
+
+ // close the connection on exit
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close", "error", err)
+ }
+
+ safeConn = nil
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
- p.RLock()
- defer p.RUnlock()
const op = errors.Op("access_validator")
p.log.Debug("validation", "topics", topics)