diff options
author | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
commit | d0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch) | |
tree | 7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/websockets | |
parent | 0323e070103cc2c30d2cdfb12719d753acafe151 (diff) |
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/executor/executor.go | 26 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 48 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 63 |
3 files changed, 87 insertions, 50 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 24ea19ce..69aad7d4 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" ) @@ -23,9 +22,8 @@ type Response struct { type Executor struct { sync.Mutex - conn *connection.Connection - storage *storage.Storage - log logger.Logger + conn *connection.Connection + log logger.Logger // associated connection ID connID string @@ -39,12 +37,11 @@ type Executor struct { } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, +func NewExecutor(conn *connection.Connection, log logger.Logger, connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, - storage: bst, log: log, pubsub: pubsubs, accessValidator: av, @@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit func (e *Executor) Set(br pubsub.PubSub, topics []string) error { // associate connection with topics - err := br.Subscribe(topics...) + err := br.Subscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(topics...) + _ = br.Unsubscribe(e.connID, topics...) return err } - e.storage.InsertMany(e.connID, topics) - // save topics for the connection for i := 0; i < len(topics); i++ { e.actualTopics[topics[i]] = struct{}{} @@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { // remove associated connections from the storage - e.storage.RemoveMany(e.connID, topics) - err := br.Unsubscribe(topics...) + err := br.Unsubscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) return err @@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { } func (e *Executor) CleanUp() { + // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // remove from the bst - e.storage.Remove(e.connID, topic) - + // here for _, ps := range e.pubsub { - _ = ps.Unsubscribe(topic) + _ = ps.Unsubscribe(e.connID, topic) } } + // clean up the actualTopics data for k := range e.actualTopics { delete(e.actualTopics, k) } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index fe55d30e..4c0edcad 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -24,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "github.com/spiral/roadrunner/v2/utils" ) @@ -43,7 +42,6 @@ type Plugin struct { // global connections map connections sync.Map - storage *storage.Storage // GO workers pool workersPool *pool.WorkersPool @@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.pubsubs = make(map[string]pubsub.PubSub) p.log = log - p.storage = storage.NewStorage() - p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, + ReadBufferSize: 1024, + WriteBufferSize: 1024, } p.serveExit = make(chan struct{}) p.server = server @@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() + p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log) + // run all pubsubs drivers for _, v := range p.pubsubs { go func(ps pubsub.PubSub) { @@ -133,8 +133,13 @@ func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() p.Lock() + if p.phpPool == nil { + p.Unlock() + return nil + } p.phpPool.Destroy(context.Background()) p.Unlock() + return nil } @@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // store connection p.connections.Store(connectionID, safeConn) - defer func() { - // close the connection on exit - err = safeConn.Close() - if err != nil { - p.log.Error("connection close", "error", err) - } - - // when exiting - delete the connection - p.connections.Delete(connectionID) - }() - // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) - defer e.CleanUp() err = e.StartCommandLoop() if err != nil { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } + + // when exiting - delete the connection + p.connections.Delete(connectionID) + + // remove connection from all topics from all pub-sub drivers + e.CleanUp() + + err = r.Body.Close() + if err != nil { + p.log.Error("body close", "error", err) + } + + // close the connection on exit + err = safeConn.Close() + if err != nil { + p.log.Error("connection close", "error", err) + } + + safeConn = nil p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { - p.RLock() - defer p.RUnlock() const op = errors.Op("access_validator") p.log.Debug("validation", "topics", topics) diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 7fcc873b..544f3ede 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,20 +1,22 @@ package pool import ( + "bytes" "sync" "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage *storage.Storage + storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool + bPool sync.Pool log logger.Logger queue chan *message.Message @@ -22,11 +24,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *message.Message, 100), - storage: storage, + storage: pubsubs, log: log, exit: make(chan struct{}), } @@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } + wp.bPool.New = func() interface{} { + return new(bytes.Buffer) + } // start 10 workers - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.do() } @@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) { } func (wp *WorkersPool) Stop() { - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.exit <- struct{}{} } @@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } +func (wp *WorkersPool) putBytes(b *bytes.Buffer) { + b.Reset() + wp.bPool.Put(b) +} + +func (wp *WorkersPool) getBytes() *bytes.Buffer { + return wp.bPool.Get().(*bytes.Buffer) +} + func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg.TopicsLength() == 0 { continue } + + br, ok := wp.storage[utils.AsString(msg.Broker())] + if !ok { + wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + continue + } + res := wp.get() + bb := wp.getBytes() + for i := 0; i < msg.TopicsLength(); i++ { // get connections for the particular topic - wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + br.Connections(utils.AsString(msg.Topics(i)), res) } + if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + } + wp.putBytes(bb) wp.put(res) continue } @@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } conn := c.(*connection.Connection) - // TODO sync pool for the bytes - bb := make([]byte, msg.PayloadLength()) + + // put data into the bytes buffer for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) + bb.WriteByte(byte(msg.Payload(i))) } - err := conn.Write(websocket.BinaryMessage, bb) + err := conn.Write(websocket.BinaryMessage, bb.Bytes()) if err != nil { - wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) - wp.put(res) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } } + // put bytes buffer back + wp.putBytes(bb) + // put map with results back wp.put(res) case <-wp.exit: wp.log.Info("get exit signal, exiting from the workers pool") |