diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 13:38:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 13:38:51 +0300 |
commit | 1c7c79ffc50721f586c582356d04fd826fc74811 (patch) | |
tree | d8207a4c8332dc83780809b5c689278afa5907a2 /plugins/websockets | |
parent | 34df1626822613004d0974474c8bbe10cf2f1a94 (diff) |
- Add more documetation
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/executor/executor.go | 9 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 3 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 7 |
3 files changed, 15 insertions, 4 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 9ef5e40a..048a41ed 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -3,6 +3,7 @@ package executor import ( "github.com/fasthttp/websocket" json "github.com/json-iterator/go" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" @@ -22,6 +23,8 @@ type Executor struct { // associated connection ID connID string + + // map with the pubsub drivers pubsub map[string]pubsub.PubSub } @@ -37,14 +40,16 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St } func (e *Executor) StartCommandLoop() error { + const op = errors.Op("executor_command_loop") for { mt, data, err := e.conn.Read() if err != nil { if mt == -1 { - return err + e.log.Error("socket was closed", "error", err, "message type", mt) + return nil } - return err + return errors.E(op, err) } msg := &pubsub.Msg{} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index bc5028e6..76ef800d 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -134,8 +134,11 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { http.Error(w, err.Error(), http.StatusInternalServerError) return } + + // construct safe connection protected by mutexes safeConn := connection.NewConnection(_conn, p.log) defer func() { + // close the connection on exit err = safeConn.Close() if err != nil { p.log.Error("connection close error", "error", err) diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index ee31d62f..8ff3d138 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -20,6 +20,7 @@ type WorkersPool struct { exit chan struct{} } +// NewWorkersPool constructs worker pool for the websocket connections func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, @@ -33,6 +34,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. return make(map[string]struct{}, 10) } + // start 10 workers for i := 0; i < 10; i++ { wp.do() } @@ -82,13 +84,14 @@ func (wp *WorkersPool) do() { for i := range res { c, ok := wp.connections.Load(i) if !ok { - panic("not ok here (((") + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker(), "topics", msg.Topics()) + continue } conn := c.(*connection.Connection) err := conn.Write(websocket.BinaryMessage, msg.Payload()) if err != nil { - // TODO handle error + wp.log.Error("error sending payload over the connection", "broker", msg.Broker(), "topics", msg.Topics()) wp.put(res) continue } |