summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go67
-rw-r--r--plugins/websockets/connection/connection.go69
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/broadcast_arch.drawio1
-rw-r--r--plugins/websockets/doc/ws.drawio1
-rw-r--r--plugins/websockets/executor/executor.go146
-rw-r--r--plugins/websockets/plugin.go206
-rw-r--r--plugins/websockets/pool/workers_pool.go104
-rw-r--r--plugins/websockets/rpc.go46
-rw-r--r--plugins/websockets/storage/storage.go50
-rw-r--r--plugins/websockets/validator/access_validator.go102
-rw-r--r--plugins/websockets/validator/access_validator_test.go35
13 files changed, 837 insertions, 0 deletions
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
new file mode 100644
index 00000000..18c63be3
--- /dev/null
+++ b/plugins/websockets/commands/enums.go
@@ -0,0 +1,9 @@
+package commands
+
+type Command string
+
+const (
+ Leave string = "leave"
+ Join string = "join"
+ Headers string = "headers"
+)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
new file mode 100644
index 00000000..f3cb8e12
--- /dev/null
+++ b/plugins/websockets/config.go
@@ -0,0 +1,67 @@
+package websockets
+
+import "time"
+
+/*
+websockets:
+ # pubsubs should implement PubSub interface to be collected via endure.Collects
+ # also, they should implement RPC methods to publish data into them
+ # pubsubs might use general config section or its own
+
+ pubsubs:["redis", "amqp", "memory"]
+
+ # sample of the own config section for the redis pubsub driver
+ redis:
+ address:
+ - localhost:1111
+ .... the rest
+
+
+ # path used as websockets path
+ path: "/ws"
+*/
+
+// Config represents configuration for the ws plugin
+type Config struct {
+ // http path for the websocket
+ Path string `mapstructure:"path"`
+ // ["redis", "amqp", "memory"]
+ PubSubs []string `mapstructure:"pubsubs"`
+ Middleware []string `mapstructure:"middleware"`
+ Redis *RedisConfig `mapstructure:"redis"`
+}
+
+type RedisConfig struct {
+ Addrs []string `mapstructure:"addrs"`
+ DB int `mapstructure:"db"`
+ Username string `mapstructure:"username"`
+ Password string `mapstructure:"password"`
+ MasterName string `mapstructure:"master_name"`
+ SentinelPassword string `mapstructure:"sentinel_password"`
+ RouteByLatency bool `mapstructure:"route_by_latency"`
+ RouteRandomly bool `mapstructure:"route_randomly"`
+ MaxRetries int `mapstructure:"max_retries"`
+ DialTimeout time.Duration `mapstructure:"dial_timeout"`
+ MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"`
+ MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"`
+ PoolSize int `mapstructure:"pool_size"`
+ MinIdleConns int `mapstructure:"min_idle_conns"`
+ MaxConnAge time.Duration `mapstructure:"max_conn_age"`
+ ReadTimeout time.Duration `mapstructure:"read_timeout"`
+ WriteTimeout time.Duration `mapstructure:"write_timeout"`
+ PoolTimeout time.Duration `mapstructure:"pool_timeout"`
+ IdleTimeout time.Duration `mapstructure:"idle_timeout"`
+ IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"`
+ ReadOnly bool `mapstructure:"read_only"`
+}
+
+// InitDefault initialize default values for the ws config
+func (c *Config) InitDefault() {
+ if c.Path == "" {
+ c.Path = "/ws"
+ }
+ if len(c.PubSubs) == 0 {
+ // memory used by default
+ c.PubSubs = append(c.PubSubs, "memory")
+ }
+}
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
new file mode 100644
index 00000000..5eb30c61
--- /dev/null
+++ b/plugins/websockets/connection/connection.go
@@ -0,0 +1,69 @@
+package connection
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+// Connection represents wrapped and safe to use from the different threads websocket connection
+type Connection struct {
+ sync.RWMutex
+ log logger.Logger
+ conn *websocket.Conn
+}
+
+func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
+ return &Connection{
+ conn: wsConn,
+ log: log,
+ }
+}
+
+func (c *Connection) Write(mt int, data []byte) error {
+ c.Lock()
+ defer c.Unlock()
+
+ const op = errors.Op("websocket_write")
+ // handle a case when a goroutine tried to write into the closed connection
+ defer func() {
+ if r := recover(); r != nil {
+ c.log.Warn("panic handled, tried to write into the closed connection")
+ }
+ }()
+
+ err := c.conn.WriteMessage(mt, data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
+
+func (c *Connection) Read() (int, []byte, error) {
+ //c.RLock()
+ //defer c.RUnlock()
+ const op = errors.Op("websocket_read")
+
+ mt, data, err := c.conn.ReadMessage()
+ if err != nil {
+ return -1, nil, errors.E(op, err)
+ }
+
+ return mt, data, nil
+}
+
+func (c *Connection) Close() error {
+ c.Lock()
+ defer c.Unlock()
+ const op = errors.Op("websocket_close")
+
+ err := c.conn.Close()
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
new file mode 100644
index 00000000..748fec45
--- /dev/null
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..54bba9c7
--- /dev/null
+++ b/plugins/websockets/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio
new file mode 100644
index 00000000..739b797a
--- /dev/null
+++ b/plugins/websockets/doc/ws.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
new file mode 100644
index 00000000..391c9a8c
--- /dev/null
+++ b/plugins/websockets/executor/executor.go
@@ -0,0 +1,146 @@
+package executor
+
+import (
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type Response struct {
+ Topic string `json:"topic"`
+ Payload []string `json:"payload"`
+}
+
+type Executor struct {
+ conn *connection.Connection
+ storage *storage.Storage
+ log logger.Logger
+
+ // associated connection ID
+ connID string
+ pubsub pubsub.PubSub
+}
+
+// NewExecutor creates protected connection and starts command loop
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+ return &Executor{
+ conn: conn,
+ connID: connID,
+ storage: bst,
+ log: log,
+ pubsub: pubsubs,
+ }
+}
+
+func (e *Executor) StartCommandLoop() error {
+ for {
+ mt, data, err := e.conn.Read()
+ if err != nil {
+ if mt == -1 {
+ return err
+ }
+
+ return err
+ }
+
+ msg := &pubsub.Msg{}
+
+ err = json.Unmarshal(data, msg)
+ if err != nil {
+ e.log.Error("error unmarshal message", "error", err)
+ continue
+ }
+
+ switch msg.Command() {
+ // handle leave
+ case commands.Join:
+ // TODO access validators model update
+ //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
+ //// validation error
+ //if err != nil {
+ // e.log.Error("validation error", "error", err)
+ //
+ // resp := &Response{
+ // Topic: "#join",
+ // Payload: msg.Topics(),
+ // }
+ //
+ // packet, err := json.Marshal(resp)
+ // if err != nil {
+ // e.log.Error("error marshal the body", "error", err)
+ // return err
+ // }
+ //
+ // err = e.conn.Write(websocket.BinaryMessage, packet)
+ // if err != nil {
+ // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ // continue
+ // }
+ //
+ // continue
+ //}
+ // associate connection with topics
+ e.storage.Store(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@join",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ err = e.pubsub.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ // handle leave
+ case commands.Leave:
+ // remove associated connections from the storage
+ e.storage.Remove(e.connID, msg.Topics())
+
+ resp := &Response{
+ Topic: "@leave",
+ Payload: msg.Topics(),
+ }
+
+ packet, err := json.Marshal(resp)
+ if err != nil {
+ e.log.Error("error marshal the body", "error", err)
+ continue
+ }
+
+ err = e.pubsub.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+
+ err = e.conn.Write(websocket.BinaryMessage, packet)
+ if err != nil {
+ e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ continue
+ }
+
+ case commands.Headers:
+
+ default:
+ e.log.Warn("unknown command", "command", msg.Command())
+ }
+ }
+}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
new file mode 100644
index 00000000..a247da69
--- /dev/null
+++ b/plugins/websockets/plugin.go
@@ -0,0 +1,206 @@
+package websockets
+
+import (
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/fasthttp/websocket"
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+const (
+ PluginName string = "websockets"
+)
+
+type Plugin struct {
+ sync.RWMutex
+ // Collection with all available pubsubs
+ pubsubs map[string]pubsub.PubSub
+
+ Config *Config
+ log logger.Logger
+
+ // global connections map
+ connections sync.Map
+ storage *storage.Storage
+
+ workersPool *pool.WorkersPool
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("websockets_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ err := cfg.UnmarshalKey(PluginName, &p.Config)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.pubsubs = make(map[string]pubsub.PubSub)
+ p.log = log
+ p.storage = storage.NewStorage()
+ p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
+
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ errCh := make(chan error)
+ go func() {
+ ps := p.pubsubs["redis"]
+
+ for {
+ // get message
+ // get topic
+ // get connection uuid from the storage by the topic
+ // write payload into the connection
+ // do that in the workers pool
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
+ return
+ }
+
+ if data == nil {
+ continue
+ }
+
+ p.workersPool.Queue(data)
+ }
+ }()
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
+ p.workersPool.Stop()
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.GetPublishers,
+ }
+}
+
+func (p *Plugin) Available() {}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+// GetPublishers collects all pubsubs
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
+ p.pubsubs[name.Name()] = pub
+}
+
+func (p *Plugin) Middleware(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != p.Config.Path {
+ next.ServeHTTP(w, r)
+ return
+ }
+
+ // connection upgrader
+ upgraded := websocket.Upgrader{
+ HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 0,
+ WriteBufferSize: 0,
+ WriteBufferPool: nil,
+ Subprotocols: nil,
+ Error: nil,
+ CheckOrigin: nil,
+ EnableCompression: false,
+ }
+
+ // upgrade connection to websocket connection
+ _conn, err := upgraded.Upgrade(w, r, nil)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ safeConn := connection.NewConnection(_conn, p.log)
+ defer func() {
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close error", "error", err)
+ }
+ }()
+
+ // generate UUID from the connection
+ connectionID := uuid.NewString()
+ // store connection
+ p.connections.Store(connectionID, safeConn)
+ // when exiting - delete the connection
+ defer func() {
+ p.connections.Delete(connectionID)
+ }()
+
+ // Executor wraps a connection to have a safe abstraction
+ p.Lock()
+ e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"])
+ p.Unlock()
+
+ p.log.Info("websocket client connected", "uuid", connectionID)
+
+ err = e.StartCommandLoop()
+ if err != nil {
+ p.log.Error("command loop error", "error", err.Error())
+ return
+ }
+ })
+}
+
+func (p *Plugin) Publish(msg []pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ if br, ok := p.pubsubs[msg[i].Broker()]; ok {
+ err := br.Publish(msg)
+ if err != nil {
+ return errors.E(err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker())
+ }
+ }
+ }
+ return nil
+}
+
+func (p *Plugin) PublishAsync(msg []pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ for i := 0; i < len(msg); i++ {
+ for j := 0; j < len(msg[i].Topics()); j++ {
+ err := p.pubsubs[msg[i].Broker()].Publish(msg)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
new file mode 100644
index 00000000..ee31d62f
--- /dev/null
+++ b/plugins/websockets/pool/workers_pool.go
@@ -0,0 +1,104 @@
+package pool
+
+import (
+ "sync"
+
+ "github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+)
+
+type WorkersPool struct {
+ storage *storage.Storage
+ connections *sync.Map
+ resPool sync.Pool
+ log logger.Logger
+
+ queue chan pubsub.Message
+ exit chan struct{}
+}
+
+func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+ wp := &WorkersPool{
+ connections: connections,
+ queue: make(chan pubsub.Message, 100),
+ storage: storage,
+ log: log,
+ exit: make(chan struct{}),
+ }
+
+ wp.resPool.New = func() interface{} {
+ return make(map[string]struct{}, 10)
+ }
+
+ for i := 0; i < 10; i++ {
+ wp.do()
+ }
+
+ return wp
+}
+
+func (wp *WorkersPool) Queue(msg pubsub.Message) {
+ wp.queue <- msg
+}
+
+func (wp *WorkersPool) Stop() {
+ for i := 0; i < 10; i++ {
+ wp.exit <- struct{}{}
+ }
+
+ close(wp.exit)
+}
+
+func (wp *WorkersPool) put(res map[string]struct{}) {
+ // optimized
+ // https://go-review.googlesource.com/c/go/+/110055/
+ // not O(n), but O(1)
+ for k := range res {
+ delete(res, k)
+ }
+}
+
+func (wp *WorkersPool) get() map[string]struct{} {
+ return wp.resPool.Get().(map[string]struct{})
+}
+
+func (wp *WorkersPool) do() {
+ go func() {
+ for {
+ select {
+ case msg := <-wp.queue:
+ res := wp.get()
+ // get connections for the particular topic
+ wp.storage.Get(msg.Topics(), res)
+ if len(res) == 0 {
+ wp.log.Info("no such topic", "topic", msg.Topics())
+ wp.put(res)
+ continue
+ }
+
+ for i := range res {
+ c, ok := wp.connections.Load(i)
+ if !ok {
+ panic("not ok here (((")
+ }
+
+ conn := c.(*connection.Connection)
+ err := conn.Write(websocket.BinaryMessage, msg.Payload())
+ if err != nil {
+ // TODO handle error
+ wp.put(res)
+ continue
+ }
+ }
+
+ wp.put(res)
+ case <-wp.exit:
+ wp.log.Info("get exit signal, exiting from the workers pool")
+ return
+ }
+ }
+ }()
+}
diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go
new file mode 100644
index 00000000..0f0303b7
--- /dev/null
+++ b/plugins/websockets/rpc.go
@@ -0,0 +1,46 @@
+package websockets
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error {
+ const op = errors.Op("broadcast_publish")
+
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+ err := r.plugin.Publish(mi)
+ if err != nil {
+ *ok = false
+ return errors.E(op, err)
+ }
+ *ok = true
+ return nil
+}
+
+func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error {
+ // publish to the registered broker
+ mi := make([]pubsub.Message, 0, len(msg))
+ // golang can't convert slice in-place
+ // so, we need to convert it manually
+ for i := 0; i < len(msg); i++ {
+ mi = append(mi, msg[i])
+ }
+
+ r.plugin.PublishAsync(mi)
+
+ *ok = true
+ return nil
+}
diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go
new file mode 100644
index 00000000..a7e49207
--- /dev/null
+++ b/plugins/websockets/storage/storage.go
@@ -0,0 +1,50 @@
+package storage
+
+import (
+ "sync"
+
+ "github.com/spiral/roadrunner/v2/plugins/memory/bst"
+)
+
+type Storage struct {
+ sync.RWMutex
+ BST bst.Storage
+}
+
+func NewStorage() *Storage {
+ return &Storage{
+ BST: bst.NewBST(),
+ }
+}
+
+func (s *Storage) Store(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Insert(connID, topics[i])
+ }
+}
+
+func (s *Storage) Remove(connID string, topics []string) {
+ s.Lock()
+ defer s.Unlock()
+
+ for i := 0; i < len(topics); i++ {
+ s.BST.Remove(connID, topics[i])
+ }
+}
+
+func (s *Storage) Get(topics []string, res map[string]struct{}) {
+ s.RLock()
+ defer s.RUnlock()
+
+ for i := 0; i < len(topics); i++ {
+ d := s.BST.Get(topics[i])
+ if len(d) > 0 {
+ for ii := range d {
+ res[ii] = struct{}{}
+ }
+ }
+ }
+}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
new file mode 100644
index 00000000..9d9522d4
--- /dev/null
+++ b/plugins/websockets/validator/access_validator.go
@@ -0,0 +1,102 @@
+package validator
+
+import (
+ "bytes"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/spiral/roadrunner/v2/plugins/http/attributes"
+)
+
+type AccessValidator struct {
+ buffer *bytes.Buffer
+ header http.Header
+ status int
+}
+
+func NewValidator() *AccessValidator {
+ return &AccessValidator{
+ buffer: bytes.NewBuffer(nil),
+ header: make(http.Header),
+ }
+}
+
+// Copy all content to parent response writer.
+func (w *AccessValidator) Copy(rw http.ResponseWriter) {
+ rw.WriteHeader(w.status)
+
+ for k, v := range w.header {
+ for _, vv := range v {
+ rw.Header().Add(k, vv)
+ }
+ }
+
+ _, _ = io.Copy(rw, w.buffer)
+}
+
+// Header returns the header map that will be sent by WriteHeader.
+func (w *AccessValidator) Header() http.Header {
+ return w.header
+}
+
+// Write writes the data to the connection as part of an HTTP reply.
+func (w *AccessValidator) Write(p []byte) (int, error) {
+ return w.buffer.Write(p)
+}
+
+// WriteHeader sends an HTTP response header with the provided status code.
+func (w *AccessValidator) WriteHeader(statusCode int) {
+ w.status = statusCode
+}
+
+// IsOK returns true if response contained 200 status code.
+func (w *AccessValidator) IsOK() bool {
+ return w.status == 200
+}
+
+// Body returns response body to rely to user.
+func (w *AccessValidator) Body() []byte {
+ return w.buffer.Bytes()
+}
+
+// Error contains server response.
+func (w *AccessValidator) Error() string {
+ return w.buffer.String()
+}
+
+// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in
+// case of error
+func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error {
+ if err := attributes.Set(r, "ws:joinServer", true); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinServer")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
+
+// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies.
+// the decision to authorize user will be based on response code (200).
+func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error {
+ if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil {
+ return err
+ }
+
+ defer delete(attributes.All(r), "ws:joinTopics")
+
+ f(w, r)
+
+ if !w.IsOK() {
+ return w
+ }
+
+ return nil
+}
diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go
new file mode 100644
index 00000000..4a07b00f
--- /dev/null
+++ b/plugins/websockets/validator/access_validator_test.go
@@ -0,0 +1,35 @@
+package validator
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestResponseWrapper_Body(t *testing.T) {
+ w := NewValidator()
+ _, _ = w.Write([]byte("hello"))
+
+ assert.Equal(t, []byte("hello"), w.Body())
+}
+
+func TestResponseWrapper_Header(t *testing.T) {
+ w := NewValidator()
+ w.Header().Set("k", "value")
+
+ assert.Equal(t, "value", w.Header().Get("k"))
+}
+
+func TestResponseWrapper_StatusCode(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(200)
+
+ assert.True(t, w.IsOK())
+}
+
+func TestResponseWrapper_StatusCodeBad(t *testing.T) {
+ w := NewValidator()
+ w.WriteHeader(400)
+
+ assert.False(t, w.IsOK())
+}