From dc3c5455e5c9b32737a0620c8bdb8bda0226dba7 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 00:09:33 +0300 Subject: - Update all main abstractions - Desighn a new interfaces responsible for the whole PubSub - New plugin - websockets Signed-off-by: Valery Piashchynski --- plugins/websockets/connection/connection.go | 69 +++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 plugins/websockets/connection/connection.go (limited to 'plugins/websockets/connection/connection.go') diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go new file mode 100644 index 00000000..5eb30c61 --- /dev/null +++ b/plugins/websockets/connection/connection.go @@ -0,0 +1,69 @@ +package connection + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Connection represents wrapped and safe to use from the different threads websocket connection +type Connection struct { + sync.RWMutex + log logger.Logger + conn *websocket.Conn +} + +func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { + return &Connection{ + conn: wsConn, + log: log, + } +} + +func (c *Connection) Write(mt int, data []byte) error { + c.Lock() + defer c.Unlock() + + const op = errors.Op("websocket_write") + // handle a case when a goroutine tried to write into the closed connection + defer func() { + if r := recover(); r != nil { + c.log.Warn("panic handled, tried to write into the closed connection") + } + }() + + err := c.conn.WriteMessage(mt, data) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *Connection) Read() (int, []byte, error) { + //c.RLock() + //defer c.RUnlock() + const op = errors.Op("websocket_read") + + mt, data, err := c.conn.ReadMessage() + if err != nil { + return -1, nil, errors.E(op, err) + } + + return mt, data, nil +} + +func (c *Connection) Close() error { + c.Lock() + defer c.Unlock() + const op = errors.Op("websocket_close") + + err := c.conn.Close() + if err != nil { + return errors.E(op, err) + } + + return nil +} -- cgit v1.2.3 From 0a9aea326045e56716f0736f7aa8520305362c51 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 13:26:12 +0300 Subject: - Move bst to the pkg folder - Add comments - Fix all golangci-lint warnings Signed-off-by: Valery Piashchynski --- plugins/websockets/connection/connection.go | 2 -- 1 file changed, 2 deletions(-) (limited to 'plugins/websockets/connection/connection.go') diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go index 5eb30c61..2b847173 100644 --- a/plugins/websockets/connection/connection.go +++ b/plugins/websockets/connection/connection.go @@ -43,8 +43,6 @@ func (c *Connection) Write(mt int, data []byte) error { } func (c *Connection) Read() (int, []byte, error) { - //c.RLock() - //defer c.RUnlock() const op = errors.Op("websocket_read") mt, data, err := c.conn.ReadMessage() -- cgit v1.2.3