summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/commands/enums.go9
-rw-r--r--plugins/websockets/config.go83
-rw-r--r--plugins/websockets/connection/connection.go67
-rw-r--r--plugins/websockets/doc/broadcast.drawio1
-rw-r--r--plugins/websockets/doc/doc.go27
-rw-r--r--plugins/websockets/executor/executor.go214
-rw-r--r--plugins/websockets/origin.go28
-rw-r--r--plugins/websockets/origin_test.go73
-rw-r--r--plugins/websockets/plugin.go370
-rw-r--r--plugins/websockets/pool/workers_pool.go135
-rw-r--r--plugins/websockets/validator/access_validator.go81
-rw-r--r--plugins/websockets/wildcard.go12
12 files changed, 0 insertions, 1100 deletions
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go
deleted file mode 100644
index 18c63be3..00000000
--- a/plugins/websockets/commands/enums.go
+++ /dev/null
@@ -1,9 +0,0 @@
-package commands
-
-type Command string
-
-const (
- Leave string = "leave"
- Join string = "join"
- Headers string = "headers"
-)
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
deleted file mode 100644
index 933a12e0..00000000
--- a/plugins/websockets/config.go
+++ /dev/null
@@ -1,83 +0,0 @@
-package websockets
-
-import (
- "strings"
- "time"
-
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pool"
-)
-
-/*
-websockets:
- broker: default
- allowed_origin: "*"
- path: "/ws"
-*/
-
-// Config represents configuration for the ws plugin
-type Config struct {
- // http path for the websocket
- Path string `mapstructure:"path"`
- AllowedOrigin string `mapstructure:"allowed_origin"`
- Broker string `mapstructure:"broker"`
-
- // wildcard origin
- allowedWOrigins []wildcard
- allowedOrigins []string
- allowedAll bool
-
- // Pool with the workers for the websockets
- Pool *pool.Config `mapstructure:"pool"`
-}
-
-// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() error {
- if c.Path == "" {
- c.Path = "/ws"
- }
-
- // broker is mandatory
- if c.Broker == "" {
- return errors.Str("broker key should be specified")
- }
-
- if c.Pool == nil {
- c.Pool = &pool.Config{}
- if c.Pool.NumWorkers == 0 {
- // 2 workers by default
- c.Pool.NumWorkers = 2
- }
-
- if c.Pool.AllocateTimeout == 0 {
- c.Pool.AllocateTimeout = time.Minute
- }
-
- if c.Pool.DestroyTimeout == 0 {
- c.Pool.DestroyTimeout = time.Minute
- }
- if c.Pool.Supervisor != nil {
- c.Pool.Supervisor.InitDefaults()
- }
- }
-
- if c.AllowedOrigin == "" {
- c.AllowedOrigin = "*"
- }
-
- // Normalize
- origin := strings.ToLower(c.AllowedOrigin)
- if origin == "*" {
- // If "*" is present in the list, turn the whole list into a match all
- c.allowedAll = true
- return nil
- } else if i := strings.IndexByte(origin, '*'); i >= 0 {
- // Split the origin in two: start and end string without the *
- w := wildcard{origin[0:i], origin[i+1:]}
- c.allowedWOrigins = append(c.allowedWOrigins, w)
- } else {
- c.allowedOrigins = append(c.allowedOrigins, origin)
- }
-
- return nil
-}
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go
deleted file mode 100644
index 04c29d83..00000000
--- a/plugins/websockets/connection/connection.go
+++ /dev/null
@@ -1,67 +0,0 @@
-package connection
-
-import (
- "sync"
-
- "github.com/fasthttp/websocket"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-// Connection represents wrapped and safe to use from the different threads websocket connection
-type Connection struct {
- sync.RWMutex
- log logger.Logger
- conn *websocket.Conn
-}
-
-func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection {
- return &Connection{
- conn: wsConn,
- log: log,
- }
-}
-
-func (c *Connection) Write(data []byte) error {
- c.Lock()
- defer c.Unlock()
-
- const op = errors.Op("websocket_write")
- // handle a case when a goroutine tried to write into the closed connection
- defer func() {
- if r := recover(); r != nil {
- c.log.Warn("panic handled, tried to write into the closed connection")
- }
- }()
-
- err := c.conn.WriteMessage(websocket.TextMessage, data)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
-
-func (c *Connection) Read() (int, []byte, error) {
- const op = errors.Op("websocket_read")
-
- mt, data, err := c.conn.ReadMessage()
- if err != nil {
- return -1, nil, errors.E(op, err)
- }
-
- return mt, data, nil
-}
-
-func (c *Connection) Close() error {
- c.Lock()
- defer c.Unlock()
- const op = errors.Op("websocket_close")
-
- err := c.conn.Close()
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
-}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
deleted file mode 100644
index 230870f2..00000000
--- a/plugins/websockets/doc/broadcast.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-05-27T20:56:56.848Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="Pt0MY_-SPz7R7foQA1VL" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go
deleted file mode 100644
index fc214be8..00000000
--- a/plugins/websockets/doc/doc.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package doc
-
-/*
-RPC message structure:
-
-type Msg struct {
- // Topic message been pushed into.
- Topics_ []string `json:"topic"`
-
- // Command (join, leave, headers)
- Command_ string `json:"command"`
-
- // Broker (redis, memory)
- Broker_ string `json:"broker"`
-
- // Payload to be broadcasted
- Payload_ []byte `json:"payload"`
-}
-
-1. Topics - string array (slice) with topics to join or leave
-2. Command - string, command to apply on the provided topics
-3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node -
-`redis` broker should be used.
-4. Payload - raw byte array to send to the subscribers (binary messages).
-
-
-*/
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
deleted file mode 100644
index c1f79a78..00000000
--- a/plugins/websockets/executor/executor.go
+++ /dev/null
@@ -1,214 +0,0 @@
-package executor
-
-import (
- "fmt"
- "net/http"
- "sync"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/websockets/commands"
- "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
-)
-
-type Response struct {
- Topic string `json:"topic"`
- Payload []string `json:"payload"`
-}
-
-type Executor struct {
- sync.Mutex
- // raw ws connection
- conn *connection.Connection
- log logger.Logger
-
- // associated connection ID
- connID string
-
- // subscriber drivers
- sub pubsub.Subscriber
- actualTopics map[string]struct{}
-
- req *http.Request
- accessValidator validator.AccessValidatorFn
-}
-
-// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
- return &Executor{
- conn: conn,
- connID: connID,
- log: log,
- sub: sub,
- accessValidator: av,
- actualTopics: make(map[string]struct{}, 10),
- req: r,
- }
-}
-
-func (e *Executor) StartCommandLoop() error { //nolint:gocognit
- const op = errors.Op("executor_command_loop")
- for {
- mt, data, err := e.conn.Read()
- if err != nil {
- if mt == -1 {
- e.log.Info("socket was closed", "reason", err, "message type", mt)
- return nil
- }
-
- return errors.E(op, err)
- }
-
- msg := &websocketsv1.Message{}
-
- err = json.Unmarshal(data, msg)
- if err != nil {
- e.log.Error("unmarshal message", "error", err)
- continue
- }
-
- // nil message, continue
- if msg == nil {
- e.log.Warn("nil message, skipping")
- continue
- }
-
- switch msg.Command {
- // handle leave
- case commands.Join:
- e.log.Debug("received join command", "msg", msg)
-
- val, err := e.accessValidator(e.req, msg.Topics...)
- if err != nil {
- if val != nil {
- e.log.Debug("validation error", "status", val.Status, "headers", val.Header, "body", val.Body)
- }
-
- resp := &Response{
- Topic: "#join",
- Payload: msg.Topics,
- }
-
- packet, errJ := json.Marshal(resp)
- if errJ != nil {
- e.log.Error("marshal the body", "error", errJ)
- return errors.E(op, fmt.Errorf("%v,%v", err, errJ))
- }
-
- errW := e.conn.Write(packet)
- if errW != nil {
- e.log.Error("write payload to the connection", "payload", packet, "error", errW)
- return errors.E(op, fmt.Errorf("%v,%v", err, errW))
- }
-
- continue
- }
-
- resp := &Response{
- Topic: "@join",
- Payload: msg.Topics,
- }
-
- packet, err := json.Marshal(resp)
- if err != nil {
- e.log.Error("marshal the body", "error", err)
- return errors.E(op, err)
- }
-
- err = e.conn.Write(packet)
- if err != nil {
- e.log.Error("write payload to the connection", "payload", packet, "error", err)
- return errors.E(op, err)
- }
-
- // subscribe to the topic
- err = e.Set(msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
-
- // handle leave
- case commands.Leave:
- e.log.Debug("received leave command", "msg", msg)
-
- // prepare response
- resp := &Response{
- Topic: "@leave",
- Payload: msg.Topics,
- }
-
- packet, err := json.Marshal(resp)
- if err != nil {
- e.log.Error("marshal the body", "error", err)
- return errors.E(op, err)
- }
-
- err = e.conn.Write(packet)
- if err != nil {
- e.log.Error("write payload to the connection", "payload", packet, "error", err)
- return errors.E(op, err)
- }
-
- err = e.Leave(msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
-
- case commands.Headers:
-
- default:
- e.log.Warn("unknown command", "command", msg.Command)
- }
- }
-}
-
-func (e *Executor) Set(topics []string) error {
- // associate connection with topics
- err := e.sub.Subscribe(e.connID, topics...)
- if err != nil {
- e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
- // in case of error, unsubscribe connection from the dead topics
- _ = e.sub.Unsubscribe(e.connID, topics...)
- return err
- }
-
- // save topics for the connection
- for i := 0; i < len(topics); i++ {
- e.actualTopics[topics[i]] = struct{}{}
- }
-
- return nil
-}
-
-func (e *Executor) Leave(topics []string) error {
- // remove associated connections from the storage
- err := e.sub.Unsubscribe(e.connID, topics...)
- if err != nil {
- e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
- return err
- }
-
- // remove topics for the connection
- for i := 0; i < len(topics); i++ {
- delete(e.actualTopics, topics[i])
- }
-
- return nil
-}
-
-func (e *Executor) CleanUp() {
- // unsubscribe particular connection from the topics
- for topic := range e.actualTopics {
- _ = e.sub.Unsubscribe(e.connID, topic)
- }
-
- // clean up the actualTopics data
- for k := range e.actualTopics {
- delete(e.actualTopics, k)
- }
-}
diff --git a/plugins/websockets/origin.go b/plugins/websockets/origin.go
deleted file mode 100644
index c6d9c9b8..00000000
--- a/plugins/websockets/origin.go
+++ /dev/null
@@ -1,28 +0,0 @@
-package websockets
-
-import (
- "strings"
-)
-
-func isOriginAllowed(origin string, cfg *Config) bool {
- if cfg.allowedAll {
- return true
- }
-
- origin = strings.ToLower(origin)
- // simple case
- origin = strings.ToLower(origin)
- for _, o := range cfg.allowedOrigins {
- if o == origin {
- return true
- }
- }
- // check wildcards
- for _, w := range cfg.allowedWOrigins {
- if w.match(origin) {
- return true
- }
- }
-
- return false
-}
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
deleted file mode 100644
index bbc49bbb..00000000
--- a/plugins/websockets/origin_test.go
+++ /dev/null
@@ -1,73 +0,0 @@
-package websockets
-
-import (
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestConfig_Origin(t *testing.T) {
- cfg := &Config{
- AllowedOrigin: "*",
- Broker: "any",
- }
-
- err := cfg.InitDefault()
- assert.NoError(t, err)
-
- assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg))
- assert.True(t, isOriginAllowed("http://", cfg))
- assert.True(t, isOriginAllowed("http://google.com", cfg))
- assert.True(t, isOriginAllowed("ws://*", cfg))
- assert.True(t, isOriginAllowed("*", cfg))
- assert.True(t, isOriginAllowed("you are bad programmer", cfg)) // True :(
- assert.True(t, isOriginAllowed("****", cfg))
- assert.True(t, isOriginAllowed("asde!@#!!@#!%", cfg))
- assert.True(t, isOriginAllowed("http://*.domain.com", cfg))
-}
-
-func TestConfig_OriginWildCard(t *testing.T) {
- cfg := &Config{
- AllowedOrigin: "https://*my.site.com",
- Broker: "any",
- }
-
- err := cfg.InitDefault()
- assert.NoError(t, err)
-
- assert.True(t, isOriginAllowed("https://my.site.com", cfg))
- assert.False(t, isOriginAllowed("http://", cfg))
- assert.False(t, isOriginAllowed("http://google.com", cfg))
- assert.False(t, isOriginAllowed("ws://*", cfg))
- assert.False(t, isOriginAllowed("*", cfg))
- assert.False(t, isOriginAllowed("you are bad programmer", cfg)) // True :(
- assert.False(t, isOriginAllowed("****", cfg))
- assert.False(t, isOriginAllowed("asde!@#!!@#!%", cfg))
- assert.False(t, isOriginAllowed("http://*.domain.com", cfg))
-
- assert.False(t, isOriginAllowed("https://*site.com", cfg))
- assert.True(t, isOriginAllowed("https://some.my.site.com", cfg))
-}
-
-func TestConfig_OriginWildCard2(t *testing.T) {
- cfg := &Config{
- AllowedOrigin: "https://my.*.com",
- Broker: "any",
- }
-
- err := cfg.InitDefault()
- assert.NoError(t, err)
-
- assert.True(t, isOriginAllowed("https://my.site.com", cfg))
- assert.False(t, isOriginAllowed("http://", cfg))
- assert.False(t, isOriginAllowed("http://google.com", cfg))
- assert.False(t, isOriginAllowed("ws://*", cfg))
- assert.False(t, isOriginAllowed("*", cfg))
- assert.False(t, isOriginAllowed("you are bad programmer", cfg)) // True :(
- assert.False(t, isOriginAllowed("****", cfg))
- assert.False(t, isOriginAllowed("asde!@#!!@#!%", cfg))
- assert.False(t, isOriginAllowed("http://*.domain.com", cfg))
-
- assert.False(t, isOriginAllowed("https://*site.com", cfg))
- assert.True(t, isOriginAllowed("https://my.bad.com", cfg))
-}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
deleted file mode 100644
index 395b056f..00000000
--- a/plugins/websockets/plugin.go
+++ /dev/null
@@ -1,370 +0,0 @@
-package websockets
-
-import (
- "context"
- "net/http"
- "sync"
- "time"
-
- "github.com/fasthttp/websocket"
- "github.com/google/uuid"
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/payload"
- phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
- "github.com/spiral/roadrunner/v2/pkg/state/process"
- "github.com/spiral/roadrunner/v2/pkg/worker"
- "github.com/spiral/roadrunner/v2/plugins/broadcast"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/server"
- "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/executor"
- "github.com/spiral/roadrunner/v2/plugins/websockets/pool"
- "github.com/spiral/roadrunner/v2/plugins/websockets/validator"
-)
-
-const (
- PluginName string = "websockets"
-
- RrMode string = "RR_MODE"
- RrBroadcastPath string = "RR_BROADCAST_PATH"
-)
-
-type Plugin struct {
- sync.RWMutex
-
- // subscriber+reader interfaces
- subReader pubsub.SubReader
- // broadcaster
- broadcaster broadcast.Broadcaster
-
- cfg *Config
- log logger.Logger
-
- // global connections map
- connections sync.Map
-
- // GO workers pool
- workersPool *pool.WorkersPool
-
- wsUpgrade *websocket.Upgrader
- serveExit chan struct{}
-
- // workers pool
- phpPool phpPool.Pool
- // server which produces commands to the pool
- server server.Server
-
- // stop receiving messages
- cancel context.CancelFunc
- ctx context.Context
-
- // function used to validate access to the requested resource
- accessValidator validator.AccessValidatorFn
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
- const op = errors.Op("websockets_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
-
- err := cfg.UnmarshalKey(PluginName, &p.cfg)
- if err != nil {
- return errors.E(op, err)
- }
-
- err = p.cfg.InitDefault()
- if err != nil {
- return errors.E(op, err)
- }
-
- p.wsUpgrade = &websocket.Upgrader{
- HandshakeTimeout: time.Second * 60,
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- CheckOrigin: func(r *http.Request) bool {
- return isOriginAllowed(r.Header.Get("Origin"), p.cfg)
- },
- }
- p.serveExit = make(chan struct{})
- p.server = server
- p.log = log
- p.broadcaster = b
-
- ctx, cancel := context.WithCancel(context.Background())
- p.ctx = ctx
- p.cancel = cancel
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- const op = errors.Op("websockets_plugin_serve")
- errCh := make(chan error, 1)
- // init broadcaster
- var err error
- p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- go func() {
- var err error
- p.Lock()
- defer p.Unlock()
-
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
- Debug: p.cfg.Pool.Debug,
- NumWorkers: p.cfg.Pool.NumWorkers,
- MaxJobs: p.cfg.Pool.MaxJobs,
- AllocateTimeout: p.cfg.Pool.AllocateTimeout,
- DestroyTimeout: p.cfg.Pool.DestroyTimeout,
- Supervisor: p.cfg.Pool.Supervisor,
- }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
- if err != nil {
- errCh <- errors.E(op, err)
- return
- }
-
- p.accessValidator = p.defaultAccessValidator(p.phpPool)
- }()
-
- p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
-
- // we need here only Reader part of the interface
- go func(ps pubsub.Reader) {
- for {
- data, err := ps.Next(p.ctx)
- if err != nil {
- if errors.Is(errors.TimeOut, err) {
- return
- }
-
- errCh <- errors.E(op, err)
- return
- }
-
- p.workersPool.Queue(data)
- }
- }(p.subReader)
-
- return errCh
-}
-
-func (p *Plugin) Stop() error {
- // close workers pool
- p.workersPool.Stop()
- // cancel context
- p.cancel()
- p.Lock()
- if p.phpPool == nil {
- p.Unlock()
- return nil
- }
- p.phpPool.Destroy(context.Background())
- p.Unlock()
-
- return nil
-}
-
-func (p *Plugin) Available() {}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Middleware(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != p.cfg.Path {
- next.ServeHTTP(w, r)
- return
- }
-
- // we need to lock here, because accessValidator might not be set in the Serve func at the moment
- p.RLock()
- // before we hijacked connection, we still can write to the response headers
- val, err := p.accessValidator(r)
- p.RUnlock()
- if err != nil {
- p.log.Error("access validation")
- w.WriteHeader(400)
- return
- }
-
- if val.Status != http.StatusOK {
- for k, v := range val.Header {
- for i := 0; i < len(v); i++ {
- w.Header().Add(k, v[i])
- }
- }
- w.WriteHeader(val.Status)
- _, _ = w.Write(val.Body)
- return
- }
-
- // upgrade connection to websocket connection
- _conn, err := p.wsUpgrade.Upgrade(w, r, nil)
- if err != nil {
- // connection hijacked, do not use response.writer or request
- p.log.Error("upgrade connection", "error", err)
- return
- }
-
- // construct safe connection protected by mutexes
- safeConn := connection.NewConnection(_conn, p.log)
- // generate UUID from the connection
- connectionID := uuid.NewString()
- // store connection
- p.connections.Store(connectionID, safeConn)
-
- // Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r)
- p.log.Info("websocket client connected", "uuid", connectionID)
-
- err = e.StartCommandLoop()
- if err != nil {
- p.log.Error("command loop error, disconnecting", "error", err.Error())
- return
- }
-
- // when exiting - delete the connection
- p.connections.Delete(connectionID)
-
- // remove connection from all topics from all pub-sub drivers
- e.CleanUp()
-
- err = r.Body.Close()
- if err != nil {
- p.log.Error("body close", "error", err)
- }
-
- // close the connection on exit
- err = safeConn.Close()
- if err != nil {
- p.log.Error("connection close", "error", err)
- }
-
- safeConn = nil
- p.log.Info("disconnected", "connectionID", connectionID)
- })
-}
-
-// Workers returns slice with the process states for the workers
-func (p *Plugin) Workers() []*process.State {
- p.RLock()
- defer p.RUnlock()
-
- workers := p.workers()
-
- ps := make([]*process.State, 0, len(workers))
- for i := 0; i < len(workers); i++ {
- state, err := process.WorkerProcessState(workers[i])
- if err != nil {
- return nil
- }
- ps = append(ps, state)
- }
-
- return ps
-}
-
-// internal
-func (p *Plugin) workers() []worker.BaseProcess {
- return p.phpPool.Workers()
-}
-
-// Reset destroys the old pool and replaces it with new one, waiting for old pool to die
-func (p *Plugin) Reset() error {
- p.Lock()
- defer p.Unlock()
- const op = errors.Op("ws_plugin_reset")
- p.log.Info("WS plugin got restart request. Restarting...")
- p.phpPool.Destroy(context.Background())
- p.phpPool = nil
-
- var err error
- p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{
- Debug: p.cfg.Pool.Debug,
- NumWorkers: p.cfg.Pool.NumWorkers,
- MaxJobs: p.cfg.Pool.MaxJobs,
- AllocateTimeout: p.cfg.Pool.AllocateTimeout,
- DestroyTimeout: p.cfg.Pool.DestroyTimeout,
- Supervisor: p.cfg.Pool.Supervisor,
- }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path})
- if err != nil {
- return errors.E(op, err)
- }
-
- // attach validators
- p.accessValidator = p.defaultAccessValidator(p.phpPool)
-
- p.log.Info("WS plugin successfully restarted")
- return nil
-}
-
-func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
- return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
- const op = errors.Op("access_validator")
-
- p.log.Debug("validation", "topics", topics)
- r = attributes.Init(r)
-
- // if channels len is eq to 0, we use serverValidator
- if len(topics) == 0 {
- ctx, err := validator.ServerAccessValidator(r)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- val, err := exec(ctx, pool)
- if err != nil {
- return nil, errors.E(err)
- }
-
- return val, nil
- }
-
- ctx, err := validator.TopicsAccessValidator(r, topics...)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- val, err := exec(ctx, pool)
- if err != nil {
- return nil, errors.E(op)
- }
-
- if val.Status != http.StatusOK {
- return val, errors.E(op, errors.Errorf("access forbidden, code: %d", val.Status))
- }
-
- return val, nil
- }
-}
-
-func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
- const op = errors.Op("exec")
- pd := &payload.Payload{
- Context: ctx,
- }
-
- resp, err := pool.Exec(pd)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- val := &validator.AccessValidator{
- Body: resp.Body,
- }
-
- err = json.Unmarshal(resp.Context, val)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return val, nil
-}
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
deleted file mode 100644
index 758620f6..00000000
--- a/plugins/websockets/pool/workers_pool.go
+++ /dev/null
@@ -1,135 +0,0 @@
-package pool
-
-import (
- "sync"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- "github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
-)
-
-type WorkersPool struct {
- subscriber pubsub.Subscriber
- connections *sync.Map
- resPool sync.Pool
- log logger.Logger
-
- queue chan *pubsub.Message
- exit chan struct{}
-}
-
-// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool {
- wp := &WorkersPool{
- connections: connections,
- queue: make(chan *pubsub.Message, 100),
- subscriber: subscriber,
- log: log,
- exit: make(chan struct{}),
- }
-
- wp.resPool.New = func() interface{} {
- return make(map[string]struct{}, 10)
- }
-
- // start 10 workers
- for i := 0; i < 50; i++ {
- wp.do()
- }
-
- return wp
-}
-
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
- wp.queue <- msg
-}
-
-func (wp *WorkersPool) Stop() {
- for i := 0; i < 50; i++ {
- wp.exit <- struct{}{}
- }
-
- close(wp.exit)
-}
-
-func (wp *WorkersPool) put(res map[string]struct{}) {
- // optimized
- // https://go-review.googlesource.com/c/go/+/110055/
- // not O(n), but O(1)
- for k := range res {
- delete(res, k)
- }
-}
-
-func (wp *WorkersPool) get() map[string]struct{} {
- return wp.resPool.Get().(map[string]struct{})
-}
-
-// Response from the server
-type Response struct {
- Topic string `json:"topic"`
- Payload string `json:"payload"`
-}
-
-func (wp *WorkersPool) do() { //nolint:gocognit
- go func() {
- for {
- select {
- case msg, ok := <-wp.queue:
- if !ok {
- return
- }
- _ = msg
- if msg == nil || msg.Topic == "" {
- continue
- }
-
- // get free map
- res := wp.get()
-
- // get connections for the particular topic
- wp.subscriber.Connections(msg.Topic, res)
-
- if len(res) == 0 {
- wp.log.Info("no connections associated with provided topic", "topic", msg.Topic)
- wp.put(res)
- continue
- }
-
- // res is a map with a connectionsID
- for connID := range res {
- c, ok := wp.connections.Load(connID)
- if !ok {
- wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic)
- wp.put(res)
- continue
- }
-
- d, err := json.Marshal(&Response{
- Topic: msg.Topic,
- Payload: utils.AsString(msg.Payload),
- })
-
- if err != nil {
- wp.log.Error("error marshaling response", "error", err)
- wp.put(res)
- break
- }
-
- // put data into the bytes buffer
- err = c.(*connection.Connection).Write(d)
- if err != nil {
- wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic)
- wp.put(res)
- continue
- }
- }
- case <-wp.exit:
- wp.log.Info("get exit signal, exiting from the workers pool")
- return
- }
- }
- }()
-}
diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go
deleted file mode 100644
index 2685da7f..00000000
--- a/plugins/websockets/validator/access_validator.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package validator
-
-import (
- "net/http"
- "strings"
-
- json "github.com/json-iterator/go"
- "github.com/spiral/errors"
- handler "github.com/spiral/roadrunner/v2/pkg/worker_handler"
- "github.com/spiral/roadrunner/v2/plugins/http/attributes"
-)
-
-type AccessValidatorFn = func(r *http.Request, channels ...string) (*AccessValidator, error)
-
-const (
- joinServer string = "ws:joinServer"
- joinTopics string = "ws:joinTopics"
-)
-
-type AccessValidator struct {
- Header http.Header `json:"headers"`
- Status int `json:"status"`
- Body []byte
-}
-
-func ServerAccessValidator(r *http.Request) ([]byte, error) {
- const op = errors.Op("server_access_validator")
-
- err := attributes.Set(r, "ws:joinServer", true)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- defer delete(attributes.All(r), joinServer)
-
- req := &handler.Request{
- RemoteAddr: handler.FetchIP(r.RemoteAddr),
- Protocol: r.Proto,
- Method: r.Method,
- URI: handler.URI(r),
- Header: r.Header,
- Cookies: make(map[string]string),
- RawQuery: r.URL.RawQuery,
- Attributes: attributes.All(r),
- }
-
- data, err := json.Marshal(req)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return data, nil
-}
-
-func TopicsAccessValidator(r *http.Request, topics ...string) ([]byte, error) {
- const op = errors.Op("topic_access_validator")
- err := attributes.Set(r, "ws:joinTopics", strings.Join(topics, ","))
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- defer delete(attributes.All(r), joinTopics)
-
- req := &handler.Request{
- RemoteAddr: handler.FetchIP(r.RemoteAddr),
- Protocol: r.Proto,
- Method: r.Method,
- URI: handler.URI(r),
- Header: r.Header,
- Cookies: make(map[string]string),
- RawQuery: r.URL.RawQuery,
- Attributes: attributes.All(r),
- }
-
- data, err := json.Marshal(req)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- return data, nil
-}
diff --git a/plugins/websockets/wildcard.go b/plugins/websockets/wildcard.go
deleted file mode 100644
index 2f1c6601..00000000
--- a/plugins/websockets/wildcard.go
+++ /dev/null
@@ -1,12 +0,0 @@
-package websockets
-
-import "strings"
-
-type wildcard struct {
- prefix string
- suffix string
-}
-
-func (w wildcard) match(s string) bool {
- return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix)
-}