diff options
author | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
commit | d0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch) | |
tree | 7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/websockets/plugin.go | |
parent | 0323e070103cc2c30d2cdfb12719d753acafe151 (diff) |
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 48 |
1 files changed, 29 insertions, 19 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index fe55d30e..4c0edcad 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -24,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "github.com/spiral/roadrunner/v2/utils" ) @@ -43,7 +42,6 @@ type Plugin struct { // global connections map connections sync.Map - storage *storage.Storage // GO workers pool workersPool *pool.WorkersPool @@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.pubsubs = make(map[string]pubsub.PubSub) p.log = log - p.storage = storage.NewStorage() - p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, + ReadBufferSize: 1024, + WriteBufferSize: 1024, } p.serveExit = make(chan struct{}) p.server = server @@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() + p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log) + // run all pubsubs drivers for _, v := range p.pubsubs { go func(ps pubsub.PubSub) { @@ -133,8 +133,13 @@ func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() p.Lock() + if p.phpPool == nil { + p.Unlock() + return nil + } p.phpPool.Destroy(context.Background()) p.Unlock() + return nil } @@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // store connection p.connections.Store(connectionID, safeConn) - defer func() { - // close the connection on exit - err = safeConn.Close() - if err != nil { - p.log.Error("connection close", "error", err) - } - - // when exiting - delete the connection - p.connections.Delete(connectionID) - }() - // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) - defer e.CleanUp() err = e.StartCommandLoop() if err != nil { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } + + // when exiting - delete the connection + p.connections.Delete(connectionID) + + // remove connection from all topics from all pub-sub drivers + e.CleanUp() + + err = r.Body.Close() + if err != nil { + p.log.Error("body close", "error", err) + } + + // close the connection on exit + err = safeConn.Close() + if err != nil { + p.log.Error("connection close", "error", err) + } + + safeConn = nil p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { - p.RLock() - defer p.RUnlock() const op = errors.Op("access_validator") p.log.Debug("validation", "topics", topics) |