diff options
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/commands/enums.go | 9 | ||||
-rw-r--r-- | plugins/websockets/config.go | 83 | ||||
-rw-r--r-- | plugins/websockets/connection/connection.go | 67 | ||||
-rw-r--r-- | plugins/websockets/doc/broadcast.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/doc/doc.go | 27 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 214 | ||||
-rw-r--r-- | plugins/websockets/origin.go | 28 | ||||
-rw-r--r-- | plugins/websockets/origin_test.go | 73 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 370 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 135 | ||||
-rw-r--r-- | plugins/websockets/validator/access_validator.go | 81 | ||||
-rw-r--r-- | plugins/websockets/wildcard.go | 12 |
12 files changed, 0 insertions, 1100 deletions
diff --git a/plugins/websockets/commands/enums.go b/plugins/websockets/commands/enums.go deleted file mode 100644 index 18c63be3..00000000 --- a/plugins/websockets/commands/enums.go +++ /dev/null @@ -1,9 +0,0 @@ -package commands - -type Command string - -const ( - Leave string = "leave" - Join string = "join" - Headers string = "headers" -) diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go deleted file mode 100644 index 933a12e0..00000000 --- a/plugins/websockets/config.go +++ /dev/null @@ -1,83 +0,0 @@ -package websockets - -import ( - "strings" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/pool" -) - -/* -websockets: - broker: default - allowed_origin: "*" - path: "/ws" -*/ - -// Config represents configuration for the ws plugin -type Config struct { - // http path for the websocket - Path string `mapstructure:"path"` - AllowedOrigin string `mapstructure:"allowed_origin"` - Broker string `mapstructure:"broker"` - - // wildcard origin - allowedWOrigins []wildcard - allowedOrigins []string - allowedAll bool - - // Pool with the workers for the websockets - Pool *pool.Config `mapstructure:"pool"` -} - -// InitDefault initialize default values for the ws config -func (c *Config) InitDefault() error { - if c.Path == "" { - c.Path = "/ws" - } - - // broker is mandatory - if c.Broker == "" { - return errors.Str("broker key should be specified") - } - - if c.Pool == nil { - c.Pool = &pool.Config{} - if c.Pool.NumWorkers == 0 { - // 2 workers by default - c.Pool.NumWorkers = 2 - } - - if c.Pool.AllocateTimeout == 0 { - c.Pool.AllocateTimeout = time.Minute - } - - if c.Pool.DestroyTimeout == 0 { - c.Pool.DestroyTimeout = time.Minute - } - if c.Pool.Supervisor != nil { - c.Pool.Supervisor.InitDefaults() - } - } - - if c.AllowedOrigin == "" { - c.AllowedOrigin = "*" - } - - // Normalize - origin := strings.ToLower(c.AllowedOrigin) - if origin == "*" { - // If "*" is present in the list, turn the whole list into a match all - c.allowedAll = true - return nil - } else if i := strings.IndexByte(origin, '*'); i >= 0 { - // Split the origin in two: start and end string without the * - w := wildcard{origin[0:i], origin[i+1:]} - c.allowedWOrigins = append(c.allowedWOrigins, w) - } else { - c.allowedOrigins = append(c.allowedOrigins, origin) - } - - return nil -} diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go deleted file mode 100644 index 04c29d83..00000000 --- a/plugins/websockets/connection/connection.go +++ /dev/null @@ -1,67 +0,0 @@ -package connection - -import ( - "sync" - - "github.com/fasthttp/websocket" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -// Connection represents wrapped and safe to use from the different threads websocket connection -type Connection struct { - sync.RWMutex - log logger.Logger - conn *websocket.Conn -} - -func NewConnection(wsConn *websocket.Conn, log logger.Logger) *Connection { - return &Connection{ - conn: wsConn, - log: log, - } -} - -func (c *Connection) Write(data []byte) error { - c.Lock() - defer c.Unlock() - - const op = errors.Op("websocket_write") - // handle a case when a goroutine tried to write into the closed connection - defer func() { - if r := recover(); r != nil { - c.log.Warn("panic handled, tried to write into the closed connection") - } - }() - - err := c.conn.WriteMessage(websocket.TextMessage, data) - if err != nil { - return errors.E(op, err) - } - - return nil -} - -func (c *Connection) Read() (int, []byte, error) { - const op = errors.Op("websocket_read") - - mt, data, err := c.conn.ReadMessage() - if err != nil { - return -1, nil, errors.E(op, err) - } - - return mt, data, nil -} - -func (c *Connection) Close() error { - c.Lock() - defer c.Unlock() - const op = errors.Op("websocket_close") - - err := c.conn.Close() - if err != nil { - return errors.E(op, err) - } - - return nil -} diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio deleted file mode 100644 index 230870f2..00000000 --- a/plugins/websockets/doc/broadcast.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-05-27T20:56:56.848Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="Pt0MY_-SPz7R7foQA1VL" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go deleted file mode 100644 index fc214be8..00000000 --- a/plugins/websockets/doc/doc.go +++ /dev/null @@ -1,27 +0,0 @@ -package doc - -/* -RPC message structure: - -type Msg struct { - // Topic message been pushed into. - Topics_ []string `json:"topic"` - - // Command (join, leave, headers) - Command_ string `json:"command"` - - // Broker (redis, memory) - Broker_ string `json:"broker"` - - // Payload to be broadcasted - Payload_ []byte `json:"payload"` -} - -1. Topics - string array (slice) with topics to join or leave -2. Command - string, command to apply on the provided topics -3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node - -`redis` broker should be used. -4. Payload - raw byte array to send to the subscribers (binary messages). - - -*/ diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go deleted file mode 100644 index c1f79a78..00000000 --- a/plugins/websockets/executor/executor.go +++ /dev/null @@ -1,214 +0,0 @@ -package executor - -import ( - "fmt" - "net/http" - "sync" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/websockets/commands" - "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/validator" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" -) - -type Response struct { - Topic string `json:"topic"` - Payload []string `json:"payload"` -} - -type Executor struct { - sync.Mutex - // raw ws connection - conn *connection.Connection - log logger.Logger - - // associated connection ID - connID string - - // subscriber drivers - sub pubsub.Subscriber - actualTopics map[string]struct{} - - req *http.Request - accessValidator validator.AccessValidatorFn -} - -// NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, - connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor { - return &Executor{ - conn: conn, - connID: connID, - log: log, - sub: sub, - accessValidator: av, - actualTopics: make(map[string]struct{}, 10), - req: r, - } -} - -func (e *Executor) StartCommandLoop() error { //nolint:gocognit - const op = errors.Op("executor_command_loop") - for { - mt, data, err := e.conn.Read() - if err != nil { - if mt == -1 { - e.log.Info("socket was closed", "reason", err, "message type", mt) - return nil - } - - return errors.E(op, err) - } - - msg := &websocketsv1.Message{} - - err = json.Unmarshal(data, msg) - if err != nil { - e.log.Error("unmarshal message", "error", err) - continue - } - - // nil message, continue - if msg == nil { - e.log.Warn("nil message, skipping") - continue - } - - switch msg.Command { - // handle leave - case commands.Join: - e.log.Debug("received join command", "msg", msg) - - val, err := e.accessValidator(e.req, msg.Topics...) - if err != nil { - if val != nil { - e.log.Debug("validation error", "status", val.Status, "headers", val.Header, "body", val.Body) - } - - resp := &Response{ - Topic: "#join", - Payload: msg.Topics, - } - - packet, errJ := json.Marshal(resp) - if errJ != nil { - e.log.Error("marshal the body", "error", errJ) - return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) - } - - errW := e.conn.Write(packet) - if errW != nil { - e.log.Error("write payload to the connection", "payload", packet, "error", errW) - return errors.E(op, fmt.Errorf("%v,%v", err, errW)) - } - - continue - } - - resp := &Response{ - Topic: "@join", - Payload: msg.Topics, - } - - packet, err := json.Marshal(resp) - if err != nil { - e.log.Error("marshal the body", "error", err) - return errors.E(op, err) - } - - err = e.conn.Write(packet) - if err != nil { - e.log.Error("write payload to the connection", "payload", packet, "error", err) - return errors.E(op, err) - } - - // subscribe to the topic - err = e.Set(msg.Topics) - if err != nil { - return errors.E(op, err) - } - - // handle leave - case commands.Leave: - e.log.Debug("received leave command", "msg", msg) - - // prepare response - resp := &Response{ - Topic: "@leave", - Payload: msg.Topics, - } - - packet, err := json.Marshal(resp) - if err != nil { - e.log.Error("marshal the body", "error", err) - return errors.E(op, err) - } - - err = e.conn.Write(packet) - if err != nil { - e.log.Error("write payload to the connection", "payload", packet, "error", err) - return errors.E(op, err) - } - - err = e.Leave(msg.Topics) - if err != nil { - return errors.E(op, err) - } - - case commands.Headers: - - default: - e.log.Warn("unknown command", "command", msg.Command) - } - } -} - -func (e *Executor) Set(topics []string) error { - // associate connection with topics - err := e.sub.Subscribe(e.connID, topics...) - if err != nil { - e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) - // in case of error, unsubscribe connection from the dead topics - _ = e.sub.Unsubscribe(e.connID, topics...) - return err - } - - // save topics for the connection - for i := 0; i < len(topics); i++ { - e.actualTopics[topics[i]] = struct{}{} - } - - return nil -} - -func (e *Executor) Leave(topics []string) error { - // remove associated connections from the storage - err := e.sub.Unsubscribe(e.connID, topics...) - if err != nil { - e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) - return err - } - - // remove topics for the connection - for i := 0; i < len(topics); i++ { - delete(e.actualTopics, topics[i]) - } - - return nil -} - -func (e *Executor) CleanUp() { - // unsubscribe particular connection from the topics - for topic := range e.actualTopics { - _ = e.sub.Unsubscribe(e.connID, topic) - } - - // clean up the actualTopics data - for k := range e.actualTopics { - delete(e.actualTopics, k) - } -} diff --git a/plugins/websockets/origin.go b/plugins/websockets/origin.go deleted file mode 100644 index c6d9c9b8..00000000 --- a/plugins/websockets/origin.go +++ /dev/null @@ -1,28 +0,0 @@ -package websockets - -import ( - "strings" -) - -func isOriginAllowed(origin string, cfg *Config) bool { - if cfg.allowedAll { - return true - } - - origin = strings.ToLower(origin) - // simple case - origin = strings.ToLower(origin) - for _, o := range cfg.allowedOrigins { - if o == origin { - return true - } - } - // check wildcards - for _, w := range cfg.allowedWOrigins { - if w.match(origin) { - return true - } - } - - return false -} diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go deleted file mode 100644 index bbc49bbb..00000000 --- a/plugins/websockets/origin_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package websockets - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestConfig_Origin(t *testing.T) { - cfg := &Config{ - AllowedOrigin: "*", - Broker: "any", - } - - err := cfg.InitDefault() - assert.NoError(t, err) - - assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg)) - assert.True(t, isOriginAllowed("http://", cfg)) - assert.True(t, isOriginAllowed("http://google.com", cfg)) - assert.True(t, isOriginAllowed("ws://*", cfg)) - assert.True(t, isOriginAllowed("*", cfg)) - assert.True(t, isOriginAllowed("you are bad programmer", cfg)) // True :( - assert.True(t, isOriginAllowed("****", cfg)) - assert.True(t, isOriginAllowed("asde!@#!!@#!%", cfg)) - assert.True(t, isOriginAllowed("http://*.domain.com", cfg)) -} - -func TestConfig_OriginWildCard(t *testing.T) { - cfg := &Config{ - AllowedOrigin: "https://*my.site.com", - Broker: "any", - } - - err := cfg.InitDefault() - assert.NoError(t, err) - - assert.True(t, isOriginAllowed("https://my.site.com", cfg)) - assert.False(t, isOriginAllowed("http://", cfg)) - assert.False(t, isOriginAllowed("http://google.com", cfg)) - assert.False(t, isOriginAllowed("ws://*", cfg)) - assert.False(t, isOriginAllowed("*", cfg)) - assert.False(t, isOriginAllowed("you are bad programmer", cfg)) // True :( - assert.False(t, isOriginAllowed("****", cfg)) - assert.False(t, isOriginAllowed("asde!@#!!@#!%", cfg)) - assert.False(t, isOriginAllowed("http://*.domain.com", cfg)) - - assert.False(t, isOriginAllowed("https://*site.com", cfg)) - assert.True(t, isOriginAllowed("https://some.my.site.com", cfg)) -} - -func TestConfig_OriginWildCard2(t *testing.T) { - cfg := &Config{ - AllowedOrigin: "https://my.*.com", - Broker: "any", - } - - err := cfg.InitDefault() - assert.NoError(t, err) - - assert.True(t, isOriginAllowed("https://my.site.com", cfg)) - assert.False(t, isOriginAllowed("http://", cfg)) - assert.False(t, isOriginAllowed("http://google.com", cfg)) - assert.False(t, isOriginAllowed("ws://*", cfg)) - assert.False(t, isOriginAllowed("*", cfg)) - assert.False(t, isOriginAllowed("you are bad programmer", cfg)) // True :( - assert.False(t, isOriginAllowed("****", cfg)) - assert.False(t, isOriginAllowed("asde!@#!!@#!%", cfg)) - assert.False(t, isOriginAllowed("http://*.domain.com", cfg)) - - assert.False(t, isOriginAllowed("https://*site.com", cfg)) - assert.True(t, isOriginAllowed("https://my.bad.com", cfg)) -} diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go deleted file mode 100644 index 395b056f..00000000 --- a/plugins/websockets/plugin.go +++ /dev/null @@ -1,370 +0,0 @@ -package websockets - -import ( - "context" - "net/http" - "sync" - "time" - - "github.com/fasthttp/websocket" - "github.com/google/uuid" - json "github.com/json-iterator/go" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/pkg/payload" - phpPool "github.com/spiral/roadrunner/v2/pkg/pool" - "github.com/spiral/roadrunner/v2/pkg/state/process" - "github.com/spiral/roadrunner/v2/pkg/worker" - "github.com/spiral/roadrunner/v2/plugins/broadcast" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/server" - "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/executor" - "github.com/spiral/roadrunner/v2/plugins/websockets/pool" - "github.com/spiral/roadrunner/v2/plugins/websockets/validator" -) - -const ( - PluginName string = "websockets" - - RrMode string = "RR_MODE" - RrBroadcastPath string = "RR_BROADCAST_PATH" -) - -type Plugin struct { - sync.RWMutex - - // subscriber+reader interfaces - subReader pubsub.SubReader - // broadcaster - broadcaster broadcast.Broadcaster - - cfg *Config - log logger.Logger - - // global connections map - connections sync.Map - - // GO workers pool - workersPool *pool.WorkersPool - - wsUpgrade *websocket.Upgrader - serveExit chan struct{} - - // workers pool - phpPool phpPool.Pool - // server which produces commands to the pool - server server.Server - - // stop receiving messages - cancel context.CancelFunc - ctx context.Context - - // function used to validate access to the requested resource - accessValidator validator.AccessValidatorFn -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error { - const op = errors.Op("websockets_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - - err := cfg.UnmarshalKey(PluginName, &p.cfg) - if err != nil { - return errors.E(op, err) - } - - err = p.cfg.InitDefault() - if err != nil { - return errors.E(op, err) - } - - p.wsUpgrade = &websocket.Upgrader{ - HandshakeTimeout: time.Second * 60, - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return isOriginAllowed(r.Header.Get("Origin"), p.cfg) - }, - } - p.serveExit = make(chan struct{}) - p.server = server - p.log = log - p.broadcaster = b - - ctx, cancel := context.WithCancel(context.Background()) - p.ctx = ctx - p.cancel = cancel - return nil -} - -func (p *Plugin) Serve() chan error { - const op = errors.Op("websockets_plugin_serve") - errCh := make(chan error, 1) - // init broadcaster - var err error - p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - go func() { - var err error - p.Lock() - defer p.Unlock() - - p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ - Debug: p.cfg.Pool.Debug, - NumWorkers: p.cfg.Pool.NumWorkers, - MaxJobs: p.cfg.Pool.MaxJobs, - AllocateTimeout: p.cfg.Pool.AllocateTimeout, - DestroyTimeout: p.cfg.Pool.DestroyTimeout, - Supervisor: p.cfg.Pool.Supervisor, - }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) - if err != nil { - errCh <- errors.E(op, err) - return - } - - p.accessValidator = p.defaultAccessValidator(p.phpPool) - }() - - p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log) - - // we need here only Reader part of the interface - go func(ps pubsub.Reader) { - for { - data, err := ps.Next(p.ctx) - if err != nil { - if errors.Is(errors.TimeOut, err) { - return - } - - errCh <- errors.E(op, err) - return - } - - p.workersPool.Queue(data) - } - }(p.subReader) - - return errCh -} - -func (p *Plugin) Stop() error { - // close workers pool - p.workersPool.Stop() - // cancel context - p.cancel() - p.Lock() - if p.phpPool == nil { - p.Unlock() - return nil - } - p.phpPool.Destroy(context.Background()) - p.Unlock() - - return nil -} - -func (p *Plugin) Available() {} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Middleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != p.cfg.Path { - next.ServeHTTP(w, r) - return - } - - // we need to lock here, because accessValidator might not be set in the Serve func at the moment - p.RLock() - // before we hijacked connection, we still can write to the response headers - val, err := p.accessValidator(r) - p.RUnlock() - if err != nil { - p.log.Error("access validation") - w.WriteHeader(400) - return - } - - if val.Status != http.StatusOK { - for k, v := range val.Header { - for i := 0; i < len(v); i++ { - w.Header().Add(k, v[i]) - } - } - w.WriteHeader(val.Status) - _, _ = w.Write(val.Body) - return - } - - // upgrade connection to websocket connection - _conn, err := p.wsUpgrade.Upgrade(w, r, nil) - if err != nil { - // connection hijacked, do not use response.writer or request - p.log.Error("upgrade connection", "error", err) - return - } - - // construct safe connection protected by mutexes - safeConn := connection.NewConnection(_conn, p.log) - // generate UUID from the connection - connectionID := uuid.NewString() - // store connection - p.connections.Store(connectionID, safeConn) - - // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r) - p.log.Info("websocket client connected", "uuid", connectionID) - - err = e.StartCommandLoop() - if err != nil { - p.log.Error("command loop error, disconnecting", "error", err.Error()) - return - } - - // when exiting - delete the connection - p.connections.Delete(connectionID) - - // remove connection from all topics from all pub-sub drivers - e.CleanUp() - - err = r.Body.Close() - if err != nil { - p.log.Error("body close", "error", err) - } - - // close the connection on exit - err = safeConn.Close() - if err != nil { - p.log.Error("connection close", "error", err) - } - - safeConn = nil - p.log.Info("disconnected", "connectionID", connectionID) - }) -} - -// Workers returns slice with the process states for the workers -func (p *Plugin) Workers() []*process.State { - p.RLock() - defer p.RUnlock() - - workers := p.workers() - - ps := make([]*process.State, 0, len(workers)) - for i := 0; i < len(workers); i++ { - state, err := process.WorkerProcessState(workers[i]) - if err != nil { - return nil - } - ps = append(ps, state) - } - - return ps -} - -// internal -func (p *Plugin) workers() []worker.BaseProcess { - return p.phpPool.Workers() -} - -// Reset destroys the old pool and replaces it with new one, waiting for old pool to die -func (p *Plugin) Reset() error { - p.Lock() - defer p.Unlock() - const op = errors.Op("ws_plugin_reset") - p.log.Info("WS plugin got restart request. Restarting...") - p.phpPool.Destroy(context.Background()) - p.phpPool = nil - - var err error - p.phpPool, err = p.server.NewWorkerPool(context.Background(), &phpPool.Config{ - Debug: p.cfg.Pool.Debug, - NumWorkers: p.cfg.Pool.NumWorkers, - MaxJobs: p.cfg.Pool.MaxJobs, - AllocateTimeout: p.cfg.Pool.AllocateTimeout, - DestroyTimeout: p.cfg.Pool.DestroyTimeout, - Supervisor: p.cfg.Pool.Supervisor, - }, map[string]string{RrMode: "http", RrBroadcastPath: p.cfg.Path}) - if err != nil { - return errors.E(op, err) - } - - // attach validators - p.accessValidator = p.defaultAccessValidator(p.phpPool) - - p.log.Info("WS plugin successfully restarted") - return nil -} - -func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { - return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { - const op = errors.Op("access_validator") - - p.log.Debug("validation", "topics", topics) - r = attributes.Init(r) - - // if channels len is eq to 0, we use serverValidator - if len(topics) == 0 { - ctx, err := validator.ServerAccessValidator(r) - if err != nil { - return nil, errors.E(op, err) - } - - val, err := exec(ctx, pool) - if err != nil { - return nil, errors.E(err) - } - - return val, nil - } - - ctx, err := validator.TopicsAccessValidator(r, topics...) - if err != nil { - return nil, errors.E(op, err) - } - - val, err := exec(ctx, pool) - if err != nil { - return nil, errors.E(op) - } - - if val.Status != http.StatusOK { - return val, errors.E(op, errors.Errorf("access forbidden, code: %d", val.Status)) - } - - return val, nil - } -} - -func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { - const op = errors.Op("exec") - pd := &payload.Payload{ - Context: ctx, - } - - resp, err := pool.Exec(pd) - if err != nil { - return nil, errors.E(op, err) - } - - val := &validator.AccessValidator{ - Body: resp.Body, - } - - err = json.Unmarshal(resp.Context, val) - if err != nil { - return nil, errors.E(op, err) - } - - return val, nil -} diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go deleted file mode 100644 index 758620f6..00000000 --- a/plugins/websockets/pool/workers_pool.go +++ /dev/null @@ -1,135 +0,0 @@ -package pool - -import ( - "sync" - - json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/utils" -) - -type WorkersPool struct { - subscriber pubsub.Subscriber - connections *sync.Map - resPool sync.Pool - log logger.Logger - - queue chan *pubsub.Message - exit chan struct{} -} - -// NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { - wp := &WorkersPool{ - connections: connections, - queue: make(chan *pubsub.Message, 100), - subscriber: subscriber, - log: log, - exit: make(chan struct{}), - } - - wp.resPool.New = func() interface{} { - return make(map[string]struct{}, 10) - } - - // start 10 workers - for i := 0; i < 50; i++ { - wp.do() - } - - return wp -} - -func (wp *WorkersPool) Queue(msg *pubsub.Message) { - wp.queue <- msg -} - -func (wp *WorkersPool) Stop() { - for i := 0; i < 50; i++ { - wp.exit <- struct{}{} - } - - close(wp.exit) -} - -func (wp *WorkersPool) put(res map[string]struct{}) { - // optimized - // https://go-review.googlesource.com/c/go/+/110055/ - // not O(n), but O(1) - for k := range res { - delete(res, k) - } -} - -func (wp *WorkersPool) get() map[string]struct{} { - return wp.resPool.Get().(map[string]struct{}) -} - -// Response from the server -type Response struct { - Topic string `json:"topic"` - Payload string `json:"payload"` -} - -func (wp *WorkersPool) do() { //nolint:gocognit - go func() { - for { - select { - case msg, ok := <-wp.queue: - if !ok { - return - } - _ = msg - if msg == nil || msg.Topic == "" { - continue - } - - // get free map - res := wp.get() - - // get connections for the particular topic - wp.subscriber.Connections(msg.Topic, res) - - if len(res) == 0 { - wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) - wp.put(res) - continue - } - - // res is a map with a connectionsID - for connID := range res { - c, ok := wp.connections.Load(connID) - if !ok { - wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic) - wp.put(res) - continue - } - - d, err := json.Marshal(&Response{ - Topic: msg.Topic, - Payload: utils.AsString(msg.Payload), - }) - - if err != nil { - wp.log.Error("error marshaling response", "error", err) - wp.put(res) - break - } - - // put data into the bytes buffer - err = c.(*connection.Connection).Write(d) - if err != nil { - wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic) - wp.put(res) - continue - } - } - case <-wp.exit: - wp.log.Info("get exit signal, exiting from the workers pool") - return - } - } - }() -} diff --git a/plugins/websockets/validator/access_validator.go b/plugins/websockets/validator/access_validator.go deleted file mode 100644 index 2685da7f..00000000 --- a/plugins/websockets/validator/access_validator.go +++ /dev/null @@ -1,81 +0,0 @@ -package validator - -import ( - "net/http" - "strings" - - json "github.com/json-iterator/go" - "github.com/spiral/errors" - handler "github.com/spiral/roadrunner/v2/pkg/worker_handler" - "github.com/spiral/roadrunner/v2/plugins/http/attributes" -) - -type AccessValidatorFn = func(r *http.Request, channels ...string) (*AccessValidator, error) - -const ( - joinServer string = "ws:joinServer" - joinTopics string = "ws:joinTopics" -) - -type AccessValidator struct { - Header http.Header `json:"headers"` - Status int `json:"status"` - Body []byte -} - -func ServerAccessValidator(r *http.Request) ([]byte, error) { - const op = errors.Op("server_access_validator") - - err := attributes.Set(r, "ws:joinServer", true) - if err != nil { - return nil, errors.E(op, err) - } - - defer delete(attributes.All(r), joinServer) - - req := &handler.Request{ - RemoteAddr: handler.FetchIP(r.RemoteAddr), - Protocol: r.Proto, - Method: r.Method, - URI: handler.URI(r), - Header: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), - } - - data, err := json.Marshal(req) - if err != nil { - return nil, errors.E(op, err) - } - - return data, nil -} - -func TopicsAccessValidator(r *http.Request, topics ...string) ([]byte, error) { - const op = errors.Op("topic_access_validator") - err := attributes.Set(r, "ws:joinTopics", strings.Join(topics, ",")) - if err != nil { - return nil, errors.E(op, err) - } - - defer delete(attributes.All(r), joinTopics) - - req := &handler.Request{ - RemoteAddr: handler.FetchIP(r.RemoteAddr), - Protocol: r.Proto, - Method: r.Method, - URI: handler.URI(r), - Header: r.Header, - Cookies: make(map[string]string), - RawQuery: r.URL.RawQuery, - Attributes: attributes.All(r), - } - - data, err := json.Marshal(req) - if err != nil { - return nil, errors.E(op, err) - } - - return data, nil -} diff --git a/plugins/websockets/wildcard.go b/plugins/websockets/wildcard.go deleted file mode 100644 index 2f1c6601..00000000 --- a/plugins/websockets/wildcard.go +++ /dev/null @@ -1,12 +0,0 @@ -package websockets - -import "strings" - -type wildcard struct { - prefix string - suffix string -} - -func (w wildcard) match(s string) bool { - return len(s) >= len(w.prefix)+len(w.suffix) && strings.HasPrefix(s, w.prefix) && strings.HasSuffix(s, w.suffix) -} |