summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-05-27 13:38:51 +0300
committerValery Piashchynski <[email protected]>2021-05-27 13:38:51 +0300
commit1c7c79ffc50721f586c582356d04fd826fc74811 (patch)
treed8207a4c8332dc83780809b5c689278afa5907a2 /plugins/websockets
parent34df1626822613004d0974474c8bbe10cf2f1a94 (diff)
- Add more documetation
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/executor/executor.go9
-rw-r--r--plugins/websockets/plugin.go3
-rw-r--r--plugins/websockets/pool/workers_pool.go7
3 files changed, 15 insertions, 4 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 9ef5e40a..048a41ed 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -3,6 +3,7 @@ package executor
import (
"github.com/fasthttp/websocket"
json "github.com/json-iterator/go"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
@@ -22,6 +23,8 @@ type Executor struct {
// associated connection ID
connID string
+
+ // map with the pubsub drivers
pubsub map[string]pubsub.PubSub
}
@@ -37,14 +40,16 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St
}
func (e *Executor) StartCommandLoop() error {
+ const op = errors.Op("executor_command_loop")
for {
mt, data, err := e.conn.Read()
if err != nil {
if mt == -1 {
- return err
+ e.log.Error("socket was closed", "error", err, "message type", mt)
+ return nil
}
- return err
+ return errors.E(op, err)
}
msg := &pubsub.Msg{}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index bc5028e6..76ef800d 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -134,8 +134,11 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
+
+ // construct safe connection protected by mutexes
safeConn := connection.NewConnection(_conn, p.log)
defer func() {
+ // close the connection on exit
err = safeConn.Close()
if err != nil {
p.log.Error("connection close error", "error", err)
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index ee31d62f..8ff3d138 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -20,6 +20,7 @@ type WorkersPool struct {
exit chan struct{}
}
+// NewWorkersPool constructs worker pool for the websocket connections
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
@@ -33,6 +34,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return make(map[string]struct{}, 10)
}
+ // start 10 workers
for i := 0; i < 10; i++ {
wp.do()
}
@@ -82,13 +84,14 @@ func (wp *WorkersPool) do() {
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- panic("not ok here (((")
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker(), "topics", msg.Topics())
+ continue
}
conn := c.(*connection.Connection)
err := conn.Write(websocket.BinaryMessage, msg.Payload())
if err != nil {
- // TODO handle error
+ wp.log.Error("error sending payload over the connection", "broker", msg.Broker(), "topics", msg.Topics())
wp.put(res)
continue
}