summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 13:38:51 +0300
committerValery Piashchynski <[email protected]>2021-05-27 13:38:51 +0300
commit1c7c79ffc50721f586c582356d04fd826fc74811 (patch)
treed8207a4c8332dc83780809b5c689278afa5907a2 /plugins/websockets/pool
parent34df1626822613004d0974474c8bbe10cf2f1a94 (diff)
- Add more documetation
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/pool')
-rw-r--r--plugins/websockets/pool/workers_pool.go7
1 files changed, 5 insertions, 2 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index ee31d62f..8ff3d138 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -20,6 +20,7 @@ type WorkersPool struct {
exit chan struct{}
}
+// NewWorkersPool constructs worker pool for the websocket connections
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
@@ -33,6 +34,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return make(map[string]struct{}, 10)
}
+ // start 10 workers
for i := 0; i < 10; i++ {
wp.do()
}
@@ -82,13 +84,14 @@ func (wp *WorkersPool) do() {
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- panic("not ok here (((")
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker(), "topics", msg.Topics())
+ continue
}
conn := c.(*connection.Connection)
err := conn.Write(websocket.BinaryMessage, msg.Payload())
if err != nil {
- // TODO handle error
+ wp.log.Error("error sending payload over the connection", "broker", msg.Broker(), "topics", msg.Topics())
wp.put(res)
continue
}