diff options
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/commands/enums.go | 9 | ||||
-rw-r--r-- | plugins/websockets/config.go | 67 | ||||
-rw-r--r-- | plugins/websockets/connection/connection.go | 69 | ||||
-rw-r--r-- | plugins/websockets/doc/broadcast.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/doc/broadcast_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/doc/ws.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 146 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 206 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 104 | ||||
-rw-r--r-- | plugins/websockets/rpc.go | 46 | ||||
-rw-r--r-- | plugins/websockets/storage/storage.go | 50 | ||||
-rw-r--r-- | plugins/websockets/validator/access_validator.go | 102 | ||||
-rw-r--r-- | plugins/websockets/validator/access_validator_test.go | 35 |
13 files changed, 837 insertions, 0 deletions
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go new file mode 100644 index 00000000..18c63be3 --- /dev/null +++ b/plugins/websockets/commands/enums.go @@ -0,0 +1,9 @@ +package commands + +type Command string + +const ( + Leave string = "leave" + Join string = "join" + Headers string = "headers" +) diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go new file mode 100644 index 00000000..f3cb8e12 --- /dev/null +++ b/plugins/websockets/config.go @@ -0,0 +1,67 @@ +package websockets + +import "time" + +/* +websockets: + # pubsubs should implement PubSub interface to be collected via endure.Collects + # also, they should implement RPC methods to publish data into them + # pubsubs might use general config section or its own + + pubsubs:["redis", "amqp", "memory"] + + # sample of the own config section for the redis pubsub driver + redis: + address: + - localhost:1111 + .... the rest + + + # path used as websockets path + path: "/ws" +*/ + +// Config represents configuration for the ws plugin +type Config struct { + // http path for the websocket + Path string `mapstructure:"path"` + // ["redis", "amqp", "memory"] + PubSubs []string `mapstructure:"pubsubs"` + Middleware []string `mapstructure:"middleware"` + Redis *RedisConfig `mapstructure:"redis"` +} + +type RedisConfig struct { + Addrs []string `mapstructure:"addrs"` + DB int `mapstructure:"db"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + MasterName string `mapstructure:"master_name"` + SentinelPassword string `mapstructure:"sentinel_password"` + RouteByLatency bool `mapstructure:"route_by_latency"` + RouteRandomly bool `mapstructure:"route_randomly"` + MaxRetries int `mapstructure:"max_retries"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` + MinRetryBackoff time.Duration `mapstructure:"min_retry_backoff"` + MaxRetryBackoff time.Duration `mapstructure:"max_retry_backoff"` + PoolSize int `mapstructure:"pool_size"` + MinIdleConns int `mapstructure:"min_idle_conns"` + MaxConnAge time.Duration `mapstructure:"max_conn_age"` + ReadTimeout time.Duration `mapstructure:"read_timeout"` + WriteTimeout time.Duration `mapstructure:"write_timeout"` + PoolTimeout time.Duration `mapstructure:"pool_timeout"` + IdleTimeout time.Duration `mapstructure:"idle_timeout"` + IdleCheckFreq time.Duration `mapstructure:"idle_check_freq"` + ReadOnly bool `mapstructure:"read_only"` +} + +// InitDefault initialize default values for the ws config +func (c *Config) InitDefault() { + if c.Path == "" { + c.Path = "/ws" + } + if len(c.PubSubs) == 0 { + // memory used by default + c.PubSubs = append(c.PubSubs, "memory") + } +} diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go new file mode 100644 index 00000000..5eb30c61 --- /dev/null +++ b/plugins/websockets/connection/connection.go @@ -0,0 +1,69 @@ +package connection + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +// Connection represents wrapped and safe to use from the different threads websocket connection +type Connection struct { + sync.RWMutex + log logger.Logger + conn *websocket.Conn +} + +func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { + return &Connection{ + conn: wsConn, + log: log, + } +} + +func (c *Connection) Write(mt int, data []byte) error { + c.Lock() + defer c.Unlock() + + const op = errors.Op("websocket_write") + // handle a case when a goroutine tried to write into the closed connection + defer func() { + if r := recover(); r != nil { + c.log.Warn("panic handled, tried to write into the closed connection") + } + }() + + err := c.conn.WriteMessage(mt, data) + if err != nil { + return errors.E(op, err) + } + + return nil +} + +func (c *Connection) Read() (int, []byte, error) { + //c.RLock() + //defer c.RUnlock() + const op = errors.Op("websocket_read") + + mt, data, err := c.conn.ReadMessage() + if err != nil { + return -1, nil, errors.E(op, err) + } + + return mt, data, nil +} + +func (c *Connection) Close() error { + c.Lock() + defer c.Unlock() + const op = errors.Op("websocket_close") + + err := c.conn.Close() + if err != nil { + return errors.E(op, err) + } + + return nil +} diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio new file mode 100644 index 00000000..748fec45 --- /dev/null +++ b/plugins/websockets/doc/broadcast.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio new file mode 100644 index 00000000..54bba9c7 --- /dev/null +++ b/plugins/websockets/doc/broadcast_arch.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio new file mode 100644 index 00000000..739b797a --- /dev/null +++ b/plugins/websockets/doc/ws.drawio @@ -0,0 +1 @@ +<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go new file mode 100644 index 00000000..391c9a8c --- /dev/null +++ b/plugins/websockets/executor/executor.go @@ -0,0 +1,146 @@ +package executor + +import ( + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/commands" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type Response struct { + Topic string `json:"topic"` + Payload []string `json:"payload"` +} + +type Executor struct { + conn *connection.Connection + storage *storage.Storage + log logger.Logger + + // associated connection ID + connID string + pubsub pubsub.PubSub +} + +// NewExecutor creates protected connection and starts command loop +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { + return &Executor{ + conn: conn, + connID: connID, + storage: bst, + log: log, + pubsub: pubsubs, + } +} + +func (e *Executor) StartCommandLoop() error { + for { + mt, data, err := e.conn.Read() + if err != nil { + if mt == -1 { + return err + } + + return err + } + + msg := &pubsub.Msg{} + + err = json.Unmarshal(data, msg) + if err != nil { + e.log.Error("error unmarshal message", "error", err) + continue + } + + switch msg.Command() { + // handle leave + case commands.Join: + // TODO access validators model update + //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) + //// validation error + //if err != nil { + // e.log.Error("validation error", "error", err) + // + // resp := &Response{ + // Topic: "#join", + // Payload: msg.Topics(), + // } + // + // packet, err := json.Marshal(resp) + // if err != nil { + // e.log.Error("error marshal the body", "error", err) + // return err + // } + // + // err = e.conn.Write(websocket.BinaryMessage, packet) + // if err != nil { + // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + // continue + // } + // + // continue + //} + // associate connection with topics + e.storage.Store(e.connID, msg.Topics()) + + resp := &Response{ + Topic: "@join", + Payload: msg.Topics(), + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + continue + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + continue + } + + err = e.pubsub.Subscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + + // handle leave + case commands.Leave: + // remove associated connections from the storage + e.storage.Remove(e.connID, msg.Topics()) + + resp := &Response{ + Topic: "@leave", + Payload: msg.Topics(), + } + + packet, err := json.Marshal(resp) + if err != nil { + e.log.Error("error marshal the body", "error", err) + continue + } + + err = e.pubsub.Unsubscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + + err = e.conn.Write(websocket.BinaryMessage, packet) + if err != nil { + e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + continue + } + + case commands.Headers: + + default: + e.log.Warn("unknown command", "command", msg.Command()) + } + } +} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go new file mode 100644 index 00000000..a247da69 --- /dev/null +++ b/plugins/websockets/plugin.go @@ -0,0 +1,206 @@ +package websockets + +import ( + "net/http" + "sync" + "time" + + "github.com/fasthttp/websocket" + "github.com/google/uuid" + endure "github.com/spiral/endure/pkg/container" + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/config" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/executor" + "github.com/spiral/roadrunner/v2/plugins/websockets/pool" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +const ( + PluginName string = "websockets" +) + +type Plugin struct { + sync.RWMutex + // Collection with all available pubsubs + pubsubs map[string]pubsub.PubSub + + Config *Config + log logger.Logger + + // global connections map + connections sync.Map + storage *storage.Storage + + workersPool *pool.WorkersPool +} + +func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { + const op = errors.Op("websockets_plugin_init") + if !cfg.Has(PluginName) { + return errors.E(op, errors.Disabled) + } + + err := cfg.UnmarshalKey(PluginName, &p.Config) + if err != nil { + return errors.E(op, err) + } + + p.pubsubs = make(map[string]pubsub.PubSub) + p.log = log + p.storage = storage.NewStorage() + p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) + + return nil +} + +func (p *Plugin) Serve() chan error { + errCh := make(chan error) + go func() { + ps := p.pubsubs["redis"] + + for { + // get message + // get topic + // get connection uuid from the storage by the topic + // write payload into the connection + // do that in the workers pool + data, err := ps.Next() + if err != nil { + errCh <- err + return + } + + if data == nil { + continue + } + + p.workersPool.Queue(data) + } + }() + return errCh +} + +func (p *Plugin) Stop() error { + p.workersPool.Stop() + return nil +} + +func (p *Plugin) Collects() []interface{} { + return []interface{}{ + p.GetPublishers, + } +} + +func (p *Plugin) Available() {} + +func (p *Plugin) RPC() interface{} { + return &rpc{ + plugin: p, + log: p.log, + } +} + +func (p *Plugin) Name() string { + return PluginName +} + +// GetPublishers collects all pubsubs +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { + p.pubsubs[name.Name()] = pub +} + +func (p *Plugin) Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != p.Config.Path { + next.ServeHTTP(w, r) + return + } + + // connection upgrader + upgraded := websocket.Upgrader{ + HandshakeTimeout: time.Second * 60, + ReadBufferSize: 0, + WriteBufferSize: 0, + WriteBufferPool: nil, + Subprotocols: nil, + Error: nil, + CheckOrigin: nil, + EnableCompression: false, + } + + // upgrade connection to websocket connection + _conn, err := upgraded.Upgrade(w, r, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + safeConn := connection.NewConnection(_conn, p.log) + defer func() { + err = safeConn.Close() + if err != nil { + p.log.Error("connection close error", "error", err) + } + }() + + // generate UUID from the connection + connectionID := uuid.NewString() + // store connection + p.connections.Store(connectionID, safeConn) + // when exiting - delete the connection + defer func() { + p.connections.Delete(connectionID) + }() + + // Executor wraps a connection to have a safe abstraction + p.Lock() + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"]) + p.Unlock() + + p.log.Info("websocket client connected", "uuid", connectionID) + + err = e.StartCommandLoop() + if err != nil { + p.log.Error("command loop error", "error", err.Error()) + return + } + }) +} + +func (p *Plugin) Publish(msg []pubsub.Message) error { + p.Lock() + defer p.Unlock() + + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + if br, ok := p.pubsubs[msg[i].Broker()]; ok { + err := br.Publish(msg) + if err != nil { + return errors.E(err) + } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker()) + } + } + } + return nil +} + +func (p *Plugin) PublishAsync(msg []pubsub.Message) { + go func() { + p.Lock() + defer p.Unlock() + for i := 0; i < len(msg); i++ { + for j := 0; j < len(msg[i].Topics()); j++ { + err := p.pubsubs[msg[i].Broker()].Publish(msg) + if err != nil { + p.log.Error("publish async error", "error", err) + return + } + + } + } + }() +} diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go new file mode 100644 index 00000000..ee31d62f --- /dev/null +++ b/plugins/websockets/pool/workers_pool.go @@ -0,0 +1,104 @@ +package pool + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type WorkersPool struct { + storage *storage.Storage + connections *sync.Map + resPool sync.Pool + log logger.Logger + + queue chan pubsub.Message + exit chan struct{} +} + +func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { + wp := &WorkersPool{ + connections: connections, + queue: make(chan pubsub.Message, 100), + storage: storage, + log: log, + exit: make(chan struct{}), + } + + wp.resPool.New = func() interface{} { + return make(map[string]struct{}, 10) + } + + for i := 0; i < 10; i++ { + wp.do() + } + + return wp +} + +func (wp *WorkersPool) Queue(msg pubsub.Message) { + wp.queue <- msg +} + +func (wp *WorkersPool) Stop() { + for i := 0; i < 10; i++ { + wp.exit <- struct{}{} + } + + close(wp.exit) +} + +func (wp *WorkersPool) put(res map[string]struct{}) { + // optimized + // https://go-review.googlesource.com/c/go/+/110055/ + // not O(n), but O(1) + for k := range res { + delete(res, k) + } +} + +func (wp *WorkersPool) get() map[string]struct{} { + return wp.resPool.Get().(map[string]struct{}) +} + +func (wp *WorkersPool) do() { + go func() { + for { + select { + case msg := <-wp.queue: + res := wp.get() + // get connections for the particular topic + wp.storage.Get(msg.Topics(), res) + if len(res) == 0 { + wp.log.Info("no such topic", "topic", msg.Topics()) + wp.put(res) + continue + } + + for i := range res { + c, ok := wp.connections.Load(i) + if !ok { + panic("not ok here (((") + } + + conn := c.(*connection.Connection) + err := conn.Write(websocket.BinaryMessage, msg.Payload()) + if err != nil { + // TODO handle error + wp.put(res) + continue + } + } + + wp.put(res) + case <-wp.exit: + wp.log.Info("get exit signal, exiting from the workers pool") + return + } + } + }() +} diff --git a/plugins/websockets/rpc.go b/plugins/websockets/rpc.go new file mode 100644 index 00000000..0f0303b7 --- /dev/null +++ b/plugins/websockets/rpc.go @@ -0,0 +1,46 @@ +package websockets + +import ( + "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" +) + +type rpc struct { + plugin *Plugin + log logger.Logger +} + +func (r *rpc) Publish(msg []*pubsub.Msg, ok *bool) error { + const op = errors.Op("broadcast_publish") + + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + err := r.plugin.Publish(mi) + if err != nil { + *ok = false + return errors.E(op, err) + } + *ok = true + return nil +} + +func (r *rpc) PublishAsync(msg []*pubsub.Msg, ok *bool) error { + // publish to the registered broker + mi := make([]pubsub.Message, 0, len(msg)) + // golang can't convert slice in-place + // so, we need to convert it manually + for i := 0; i < len(msg); i++ { + mi = append(mi, msg[i]) + } + + r.plugin.PublishAsync(mi) + + *ok = true + return nil +} diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go new file mode 100644 index 00000000..a7e49207 --- /dev/null +++ b/plugins/websockets/storage/storage.go @@ -0,0 +1,50 @@ +package storage + +import ( + "sync" + + "github.com/spiral/roadrunner/v2/plugins/memory/bst" +) + +type Storage struct { + sync.RWMutex + BST bst.Storage +} + +func NewStorage() *Storage { + return &Storage{ + BST: bst.NewBST(), + } +} + +func (s *Storage) Store(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Insert(connID, topics[i]) + } +} + +func (s *Storage) Remove(connID string, topics []string) { + s.Lock() + defer s.Unlock() + + for i := 0; i < len(topics); i++ { + s.BST.Remove(connID, topics[i]) + } +} + +func (s *Storage) Get(topics []string, res map[string]struct{}) { + s.RLock() + defer s.RUnlock() + + for i := 0; i < len(topics); i++ { + d := s.BST.Get(topics[i]) + if len(d) > 0 { + for ii := range d { + res[ii] = struct{}{} + } + } + } +} diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go new file mode 100644 index 00000000..9d9522d4 --- /dev/null +++ b/plugins/websockets/validator/access_validator.go @@ -0,0 +1,102 @@ +package validator + +import ( + "bytes" + "io" + "net/http" + "strings" + + "github.com/spiral/roadrunner/v2/plugins/http/attributes" +) + +type AccessValidator struct { + buffer *bytes.Buffer + header http.Header + status int +} + +func NewValidator() *AccessValidator { + return &AccessValidator{ + buffer: bytes.NewBuffer(nil), + header: make(http.Header), + } +} + +// Copy all content to parent response writer. +func (w *AccessValidator) Copy(rw http.ResponseWriter) { + rw.WriteHeader(w.status) + + for k, v := range w.header { + for _, vv := range v { + rw.Header().Add(k, vv) + } + } + + _, _ = io.Copy(rw, w.buffer) +} + +// Header returns the header map that will be sent by WriteHeader. +func (w *AccessValidator) Header() http.Header { + return w.header +} + +// Write writes the data to the connection as part of an HTTP reply. +func (w *AccessValidator) Write(p []byte) (int, error) { + return w.buffer.Write(p) +} + +// WriteHeader sends an HTTP response header with the provided status code. +func (w *AccessValidator) WriteHeader(statusCode int) { + w.status = statusCode +} + +// IsOK returns true if response contained 200 status code. +func (w *AccessValidator) IsOK() bool { + return w.status == 200 +} + +// Body returns response body to rely to user. +func (w *AccessValidator) Body() []byte { + return w.buffer.Bytes() +} + +// Error contains server response. +func (w *AccessValidator) Error() string { + return w.buffer.String() +} + +// AssertServerAccess checks if user can join server and returns error and body if user can not. Must return nil in +// case of error +func (w *AccessValidator) AssertServerAccess(f http.HandlerFunc, r *http.Request) error { + if err := attributes.Set(r, "ws:joinServer", true); err != nil { + return err + } + + defer delete(attributes.All(r), "ws:joinServer") + + f(w, r) + + if !w.IsOK() { + return w + } + + return nil +} + +// AssertTopicsAccess checks if user can access given upstream, the application will receive all user headers and cookies. +// the decision to authorize user will be based on response code (200). +func (w *AccessValidator) AssertTopicsAccess(f http.HandlerFunc, r *http.Request, channels ...string) error { + if err := attributes.Set(r, "ws:joinTopics", strings.Join(channels, ",")); err != nil { + return err + } + + defer delete(attributes.All(r), "ws:joinTopics") + + f(w, r) + + if !w.IsOK() { + return w + } + + return nil +} diff --git a/plugins/websockets/validator/access_validator_test.go b/plugins/websockets/validator/access_validator_test.go new file mode 100644 index 00000000..4a07b00f --- /dev/null +++ b/plugins/websockets/validator/access_validator_test.go @@ -0,0 +1,35 @@ +package validator + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResponseWrapper_Body(t *testing.T) { + w := NewValidator() + _, _ = w.Write([]byte("hello")) + + assert.Equal(t, []byte("hello"), w.Body()) +} + +func TestResponseWrapper_Header(t *testing.T) { + w := NewValidator() + w.Header().Set("k", "value") + + assert.Equal(t, "value", w.Header().Get("k")) +} + +func TestResponseWrapper_StatusCode(t *testing.T) { + w := NewValidator() + w.WriteHeader(200) + + assert.True(t, w.IsOK()) +} + +func TestResponseWrapper_StatusCodeBad(t *testing.T) { + w := NewValidator() + w.WriteHeader(400) + + assert.False(t, w.IsOK()) +} |