diff options
-rwxr-xr-x | Makefile | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rw-r--r-- | plugins/logger/interface.go | 2 | ||||
-rw-r--r-- | plugins/server/interface.go | 3 | ||||
-rw-r--r-- | plugins/websockets/doc/broadcast.drawio | 2 | ||||
-rw-r--r-- | plugins/websockets/doc/broadcast_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/doc/doc.go | 27 | ||||
-rw-r--r-- | plugins/websockets/doc/ws.drawio | 1 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 31 | ||||
-rw-r--r-- | plugins/websockets/storage/storage_test.go | 7 | ||||
-rw-r--r-- | tests/plugins/websockets/configs/.rr-websockets-init.yaml | 16 | ||||
-rw-r--r-- | tests/plugins/websockets/websocket_plugin_test.go | 63 |
12 files changed, 122 insertions, 35 deletions
@@ -39,6 +39,7 @@ test: ## Run application tests go test -v -race -tags=debug ./pkg/pool go test -v -race -tags=debug ./pkg/worker go test -v -race -tags=debug ./pkg/worker_watcher + go test -v -race -tags=debug ./pkg/bst go test -v -race -tags=debug ./tests/plugins/http go test -v -race -tags=debug ./plugins/http/config go test -v -race -tags=debug ./tests/plugins/informer @@ -55,4 +56,5 @@ test: ## Run application tests go test -v -race -tags=debug ./tests/plugins/resetter go test -v -race -tags=debug ./tests/plugins/rpc go test -v -race -tags=debug ./tests/plugins/kv + go test -v -race -tags=debug ./tests/plugins/websockets docker-compose -f tests/docker-compose.yaml down diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 8c9d69b9..b5d97b8b 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) { func defaultErrEncoder(sp *StaticPool) ErrorEncoder { return func(err error, w worker.BaseProcess) (payload.Payload, error) { - const op = errors.Op("error encoder") + const op = errors.Op("error_encoder") // just push event if on any stage was timeout error switch { case errors.Is(errors.ExecTTL, err): diff --git a/plugins/logger/interface.go b/plugins/logger/interface.go index 5bb2143b..827f9821 100644 --- a/plugins/logger/interface.go +++ b/plugins/logger/interface.go @@ -8,7 +8,7 @@ type Logger interface { Error(msg string, keyvals ...interface{}) } -// With creates a child logger and adds structured context to it +// WithLogger creates a child logger and adds structured context to it type WithLogger interface { With(keyvals ...interface{}) Logger } diff --git a/plugins/server/interface.go b/plugins/server/interface.go index 22f02685..0424d52d 100644 --- a/plugins/server/interface.go +++ b/plugins/server/interface.go @@ -14,7 +14,10 @@ type Env map[string]string // Server creates workers for the application. type Server interface { + // CmdFactory return a new command based on the .rr.yaml server.command section CmdFactory(env Env) (func() *exec.Cmd, error) + // NewWorker return a new worker with provided and attached by the user listeners and environment variables NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error) + // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error) } diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio index 748fec45..f9f97a11 100644 --- a/plugins/websockets/doc/broadcast.drawio +++ b/plugins/websockets/doc/broadcast.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-05-27T16:14:01.199Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="XItYCtc_WvqT4sfzvM4H" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio deleted file mode 100644 index 54bba9c7..00000000 --- a/plugins/websockets/doc/broadcast_arch.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go new file mode 100644 index 00000000..fc214be8 --- /dev/null +++ b/plugins/websockets/doc/doc.go @@ -0,0 +1,27 @@ +package doc + +/* +RPC message structure: + +type Msg struct { + // Topic message been pushed into. + Topics_ []string `json:"topic"` + + // Command (join, leave, headers) + Command_ string `json:"command"` + + // Broker (redis, memory) + Broker_ string `json:"broker"` + + // Payload to be broadcasted + Payload_ []byte `json:"payload"` +} + +1. Topics - string array (slice) with topics to join or leave +2. Command - string, command to apply on the provided topics +3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node - +`redis` broker should be used. +4. Payload - raw byte array to send to the subscribers (binary messages). + + +*/ diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio deleted file mode 100644 index 739b797a..00000000 --- a/plugins/websockets/doc/ws.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 048a41ed..8db6d9c3 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -39,13 +39,13 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St } } -func (e *Executor) StartCommandLoop() error { +func (e *Executor) StartCommandLoop() error { //nolint:gocognit const op = errors.Op("executor_command_loop") for { mt, data, err := e.conn.Read() if err != nil { if mt == -1 { - e.log.Error("socket was closed", "error", err, "message type", mt) + e.log.Info("socket was closed", "reason", err, "message type", mt) return nil } @@ -83,10 +83,15 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) - if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) - continue + // subscribe to the topic + if br, ok := e.pubsub[msg.Broker()]; ok { + err = br.Subscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + // in case of error, unsubscribe connection from the dead topics + _ = br.Unsubscribe(msg.Topics()...) + continue + } } // handle leave @@ -105,18 +110,20 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) - if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) - continue - } - err = e.conn.Write(websocket.BinaryMessage, packet) if err != nil { e.log.Error("error writing payload to the connection", "payload", packet, "error", err) continue } + if br, ok := e.pubsub[msg.Broker()]; ok { + err = br.Unsubscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + } + case commands.Headers: default: diff --git a/plugins/websockets/storage/storage_test.go b/plugins/websockets/storage/storage_test.go new file mode 100644 index 00000000..77d3180a --- /dev/null +++ b/plugins/websockets/storage/storage_test.go @@ -0,0 +1,7 @@ +package storage + +import "testing" + +func TestNewStorage(t *testing.T) { + +} diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml index 9973b2dc..1b60d3e7 100644 --- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml +++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml @@ -1,7 +1,6 @@ rpc: listen: tcp://127.0.0.1:6001 - server: command: "php ../../psr-worker-bench.php" user: "" @@ -10,11 +9,9 @@ server: relay_timeout: "20s" http: - address: 127.0.0.1:15395 + address: 127.0.0.1:11111 max_request_size: 1024 - middleware: ["websockets"] - uploads: - forbid: [ ".php", ".exe", ".bat" ] + middleware: [ "websockets" ] trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] pool: num_workers: 2 @@ -30,16 +27,9 @@ websockets: # pubsubs should implement PubSub interface to be collected via endure.Collects # also, they should implement RPC methods to publish data into them # pubsubs might use general config section or its own - pubsubs: [ "redis" ] - - # sample of the own config section for the redis pubsub driver - - redis: - addrs: - - localhost:1111 - # path used as websockets path path: "/ws" + logs: mode: development level: debug diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go index a9b90fd0..97577be8 100644 --- a/tests/plugins/websockets/websocket_plugin_test.go +++ b/tests/plugins/websockets/websocket_plugin_test.go @@ -1,6 +1,8 @@ package websockets import ( + "net/http" + "net/url" "os" "os/signal" "sync" @@ -8,14 +10,18 @@ import ( "testing" "time" + "github.com/fasthttp/websocket" + json "github.com/json-iterator/go" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/roadrunner/v2/plugins/config" httpPlugin "github.com/spiral/roadrunner/v2/plugins/http" + "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/redis" rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc" "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets" + "github.com/spiral/roadrunner/v2/utils" "github.com/stretchr/testify/assert" ) @@ -36,6 +42,7 @@ func TestBroadcastInit(t *testing.T) { &redis.Plugin{}, &websockets.Plugin{}, &httpPlugin.Plugin{}, + &memory.Plugin{}, ) assert.NoError(t, err) @@ -84,19 +91,65 @@ func TestBroadcastInit(t *testing.T) { } }() - time.Sleep(time.Second * 1000) - t.Run("test1", test1) - t.Run("test2", test2) + time.Sleep(time.Second * 1) + t.Run("TestWSInit", wsInit) stopCh <- struct{}{} wg.Wait() } -func test1(t *testing.T) { +type Msg struct { + // Topic message been pushed into. + T []string `json:"topic"` + // Command (join, leave, headers) + C string `json:"command"` + + // Broker (redis, memory) + B string `json:"broker"` + + // Payload to be broadcasted + P []byte `json:"payload"` } -func test2(t *testing.T) { +func wsInit(t *testing.T) { + da := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: time.Second * 20, + } + + connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"} + + c, resp, err := da.Dial(connURL.String(), nil) + assert.NoError(t, err) + + defer func() { + _ = resp.Body.Close() + }() + + m := &Msg{ + T: []string{"foo", "foo2"}, + C: "join", + B: "memory", + P: []byte("hello websockets"), + } + d, err := json.Marshal(m) + if err != nil { + panic(err) + } + + err = c.WriteMessage(websocket.BinaryMessage, d) + assert.NoError(t, err) + + _, msg, err := c.ReadMessage() + retMsg := utils.AsString(msg) + assert.NoError(t, err) + + // subscription done + assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg) + + err = c.WriteControl(websocket.CloseMessage, nil, time.Time{}) + assert.NoError(t, err) } |