summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xMakefile2
-rwxr-xr-xpkg/pool/static_pool.go2
-rw-r--r--plugins/logger/interface.go2
-rw-r--r--plugins/server/interface.go3
-rw-r--r--plugins/websockets/doc/broadcast.drawio2
-rw-r--r--plugins/websockets/doc/broadcast_arch.drawio1
-rw-r--r--plugins/websockets/doc/doc.go27
-rw-r--r--plugins/websockets/doc/ws.drawio1
-rw-r--r--plugins/websockets/executor/executor.go31
-rw-r--r--plugins/websockets/storage/storage_test.go7
-rw-r--r--tests/plugins/websockets/configs/.rr-websockets-init.yaml16
-rw-r--r--tests/plugins/websockets/websocket_plugin_test.go63
12 files changed, 122 insertions, 35 deletions
diff --git a/Makefile b/Makefile
index 6ff605ab..7f4c251a 100755
--- a/Makefile
+++ b/Makefile
@@ -39,6 +39,7 @@ test: ## Run application tests
go test -v -race -tags=debug ./pkg/pool
go test -v -race -tags=debug ./pkg/worker
go test -v -race -tags=debug ./pkg/worker_watcher
+ go test -v -race -tags=debug ./pkg/bst
go test -v -race -tags=debug ./tests/plugins/http
go test -v -race -tags=debug ./plugins/http/config
go test -v -race -tags=debug ./tests/plugins/informer
@@ -55,4 +56,5 @@ test: ## Run application tests
go test -v -race -tags=debug ./tests/plugins/resetter
go test -v -race -tags=debug ./tests/plugins/rpc
go test -v -race -tags=debug ./tests/plugins/kv
+ go test -v -race -tags=debug ./tests/plugins/websockets
docker-compose -f tests/docker-compose.yaml down
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 8c9d69b9..b5d97b8b 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -245,7 +245,7 @@ func (sp *StaticPool) Destroy(ctx context.Context) {
func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
return func(err error, w worker.BaseProcess) (payload.Payload, error) {
- const op = errors.Op("error encoder")
+ const op = errors.Op("error_encoder")
// just push event if on any stage was timeout error
switch {
case errors.Is(errors.ExecTTL, err):
diff --git a/plugins/logger/interface.go b/plugins/logger/interface.go
index 5bb2143b..827f9821 100644
--- a/plugins/logger/interface.go
+++ b/plugins/logger/interface.go
@@ -8,7 +8,7 @@ type Logger interface {
Error(msg string, keyvals ...interface{})
}
-// With creates a child logger and adds structured context to it
+// WithLogger creates a child logger and adds structured context to it
type WithLogger interface {
With(keyvals ...interface{}) Logger
}
diff --git a/plugins/server/interface.go b/plugins/server/interface.go
index 22f02685..0424d52d 100644
--- a/plugins/server/interface.go
+++ b/plugins/server/interface.go
@@ -14,7 +14,10 @@ type Env map[string]string
// Server creates workers for the application.
type Server interface {
+ // CmdFactory return a new command based on the .rr.yaml server.command section
CmdFactory(env Env) (func() *exec.Cmd, error)
+ // NewWorker return a new worker with provided and attached by the user listeners and environment variables
NewWorker(ctx context.Context, env Env, listeners ...events.Listener) (*worker.Process, error)
+ // NewWorkerPool return new pool of workers (PHP) with attached events listeners, env variables and based on the provided configuration
NewWorkerPool(ctx context.Context, opt pool.Config, env Env, listeners ...events.Listener) (pool.Pool, error)
}
diff --git a/plugins/websockets/doc/broadcast.drawio b/plugins/websockets/doc/broadcast.drawio
index 748fec45..f9f97a11 100644
--- a/plugins/websockets/doc/broadcast.drawio
+++ b/plugins/websockets/doc/broadcast.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-05-23T20:08:57.443Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="PhXRtBI6dFZzw_439Bi0" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">5VpZc+I4EP41VGUfoISNDx45ctUkm4NkZzIvU8KWjTbCYmWRwPz6lWzZ2JZhIByZrWWmiNU6aH19qLuthjmYLi4ZnE1uqY9IwwD+omEOG4bRtk1D/JGUZUrpdtyUEDLsq0Erwgj/RIoIFHWOfRSXBnJKCcezMtGjUYQ8XqJBxuh7eVhASflXZzBEGmHkQaJTv2KfTxS1a4BVxxXC4ST7aQOoninMRitCPIE+fS+QzPOGOWCU8vRpuhggItHLgEnnXazpzTljKOLbTFh2bm+eXvow8p5M8vrAMIh6za7ijS+zHSNfAKCalPEJDWkEyfmK2l9RbyidiWFtQfwbcb5U8oNzTgVpwqdE9aIF5t8Kzy/iGbQs1RpKdQFZY5k1Is6W34qNwizZXE1LWtk8HRcFVUznzEMbwMgUDLIQ8U3jlEQlVIVfULBfIjpFgiExgCECOX4r6xJUKhnm4/Kp9xQLng2gzMfOVEcZT6cDykuknKpZK9GLhwIbK1KiEDsoh2L4DZK52kL/8a43HPRGT4J89SygBmf3N8+X13/+oenR+wRzNJrBBPF34R3KGhFgQgaUUJaMNn2I3MAT9Jgz+ooKPbbnonEgZ9CIF+gg+WyS9xtiHC02Ckj1dswK0JmE3ws2nwljUjB3G6yXaUkau0JvaNB/He0JcWDJf7UQJx8d4vRzGIhzj64gNrpbQlzV+YNBbGoQP54Pr/dGOQgMr1aRfXtsWzUoH1KRqyh3nM9GuaOhfP1n8/b89u7xZW+kkb0Gaac7ThA9IdKW5X4y0q6GtNg6EUFRLFGej5szMg9xFGuwix3zMtKQ4DASz57ABwnw+hIXLKKinuqYYt9PYgGGYvwTjpOlJLIzeRYlO7P6DWso1xLBQJzGBe3jeGvT3lLJjQNAb3Z+DIMp/9JedBcE/ugPbl7+arYtDXvztIFVFj2lIZKzZWTVLkdWzhFDK7BtbLVlaKV0AbTaHaNbUodMGB8NvrIhNAhidJy4CuhHz71YCvSf9z1+UNu3kFPnFLu2Y8IjHz9O9xPjqMHwhUy/O/5iPut98f2bhyGPm/aJzDDPVVbpyUvJhOoNavcoeCsTrIVi2+zG2NICt7avvQRYd6qlqb6sBMivCRLfl3eS5cifKSM+Cxid5r0ejQIc6jmK3OMNHCNyvONPl9cmPdVMLa9sqB9pFGsHdSbYBC3bdsoeMRPpnulo0zRbZnnhtlle5CA+c5P2FvRAkyaMZ2kJKMAL5JdFWhOXVIWMp0ktKPGPytDbxoo+xNNQcE7wWHzDn3OG5CZDFCEGBfcXfVlsQqwVv4WHcaa2ZbSscqCje1PXycYU3alztEBHR/24YU1eI8q96Rb1orIPbv/CB3/co5rbxjQHrxftJURDDz/qxXpy17hR5/b3jaBlmFbZpDK/sqdrrESgRsvunsox6mWM++f+zfXoatdI8gAuywV2yygDbNl6XlzvtMyjOS397Ghr4PyP3Zilu7FNmrZ9amZUTA18xNZ6jMFlYYDyI2tN0bXKOYhdfS2y23jxkHJwzCxxk1iKhn11/wlGXU3qbGPLStchDLoWma6GzAi+oTzaf0fjmHqvUirZCYFppCH3Xyt7WZW3QfmbxqIc3JOWvTQ59B/vvpw/NswL+d8AzYZhw6lUyZAnIIDRfBx7DI8FvKfXZNsqpy/tGgRd54Qnk67IAjAioZqVsLH/mcu3xP2pcNFYKGJP9ILZQnynqKb0JpfHkuzrFPqkmjeVZss+pdylbh95lMHESpIx88hHjOAIrX5aPCkZpgyOM8KIs7nHRTqU9QgkxtXRgjar0iasSvnQpgkKeNrpys4qv090hr1kzjBZneEo3MTVMXgYZR6owgU4ywsTCtQcknu4JBT6+RQG3/VBJ99HwXgVX0Vfm9Vc8k2Bswlkvkd95NdssZ77ilOoeuBExXNHnfLbp2JUQJK7FbKgVpPDy/YFnGIiLf8KkTckVz24g+lWYgnLdFtZiaTgY8y609Ltyvr7kRy1nvHpAXDk9+SFGdGKaITW1TcbO93F+Mi9j48HsjX5+Ca3+8tAtiAyq0Zi1ofiWT13rLxJtIFbXiLdt3bLQ1/IrmifVVnocNdF6rVMT0V7tw87h6y/9Wte+5SveetRdtYGDXNS9a0EZ5TBBHmvibKGOBYBAJJnC5eHY1xwxKvx+gqPyMesUHj3mdB5tnGyIBZ52k0PKk7+EBG0Wa5NOHW3I5ya0sQhhHn5BoIF+xE9jB5vX69h8+rqu1sTQYuQGVwmx2mcRQ3Vg7MwZH00vT67UXj+VrmNXb234tZIBhwmxxTN1aXL1Pet7q6a5/8C</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-05-27T16:14:01.199Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="XItYCtc_WvqT4sfzvM4H" version="14.5.1" type="device"><diagram id="fD2kwGC0DAS2S_q_IsmE" name="Page-1">7V1Zc9rIFv411HUeULV28WgwTjxjx06wJ8l9mRKoAU2ExEjCS3797W4t9IbAoMZLrqcqg1oLrT77OV8fOuZg8fgx9ZfzqySAUccAwWPHPOsYhmECC/0PjzwVI7ppesXILA2Dcmw9MAp/wXIQlKOrMIAZc2GeJFEeLtnBSRLHcJIzY36aJg/sZdMkYr916c+gMDCa+JE4+i0M8nkxagIA1ic+wXA2z/kzC7+6uhzI5n6QPFBD5rBjDtIkyYtPi8cBjPDyVQtT3He+4Ww9sxTG+S43PFlXl7c/+n48uTWjn1/SEMSn3V45t/ypemMYoAUoD5M0nyezJPaj4Xq0vx69TJIlukxHg//APH8q6eev8gQNzfNFVJ6Fj2H+nfr8A30GmmGXh2eYYUB18FQdxHn69J0+oG/Dx+v7yFF1o7gy5WJlySqdwIblqFjMT2cwb7qupCleLOobyoX/CJMFRBNCF6Qw8vPwnuUmv2TKWX1dfetNEqI5G6CUIN2y7eKeSoB0y2AfUsy1vG9NfvSBmsh6iDCFnEGuvFUc/53d9od/zbI/xr9On7y7rm7swSFpsooDGJTkOIBfKB75QZ3axi8Ui/xgOETOLxlaw/wUaws0ECcxrMbOQ7xYFE9x9FbLZNaBLMWwwrPp7hQPvvejVflVlpQTLv0xUvoM9fwonMXo8wQtDUzRwD1M8xAp1dPyxCIMgoJRYBb+8sfkeXiVl5iNyavY/Y59Vq87fgB8ZN641PjlzWs1S1OkgaPFdS0f3wUacD2PkbqKZPtKc/WYHntHMp1mUInUms7LSq1LSy3YUWpZLe9u0/IvL7XSpTeclxTbyjpQYvsV+gEa+Tz8fnvyATsmMMuw14O8oTRZ4Fea44PlapytxnhOMb48g3GQ4W/LictVX/aQpD9his8skR/2PjSCuYFk5eOBpjuGyyoE4zCFUOka1+H0jHssDWHrAqPIdcZ7oybS77Zlm+y6m62Qk+UR3VNAyybVRJHy27A/uh78ObwdofFPd2iVwclo+PWvIfpwfvd58EEg9cM8zOFo6RMd+ICiOZbkU6RTB0mUpORqM/ChN50QhZsmPyF1xpl4cDzFdyRxTo0D8tfknAs8sJGEumFxXrFhlwL5sA7SdKvkmDkVnzlAlb/0igzuzm7ys7zkFm2psaMt1e2XtKWiBzzy7+FaNYVJXJvLB58Yyili98pSjtPEDyZ+lq9tbvY+NGyzdALNNAxWPtsyl6brauyTLbsaOILJNAWG8H4HgiKLaRkeR9EDI6Ly0Y6hWazVdLRqmdUT1Ph/buOlNPvB+bKDNLttCZIsT4S+OUk2mpMbiHt6wFChm3X2qci3UJHsaOI52v3Fbu8VocMDWjGBtM9zdqc2/k/q7JI/0dkt/tpydh1W9TpeyQy0rwskvq6lzNcFwooeUW/uWUOgEkq1ut1Vb04iP8vCCac69XZVpyWqzs3R3qGVBvRm/hN1QambNos3qLiuruSVeuR89ztc0HyHaTffgT4U8243V/rCXsDuTsAutnxvR+GtcrKYhelxXGT1elqP+uMChg0FtOdKSM/hmbfXzO7CPJ99g+5tESiDF0HuDjUCVUkxZZDt9+Fdmc3eFQqUXNdhS0ftZBaRyXW0qgBXh2A69xx1LpboL38dnl2M8PcY4OJz92p4df31x6F+1hQ6E2lSMXB7Y5I8VJhU7AFOVoyeJKdoKPKzTOvvs+ki/1N/7D1G/t/9weWPv8r0F7Ps5h626hDrxPha7o6+ls7Yne2VvP0tTeWJ7mZqBPPWVPKxqpCw4gf7MEk+QhlA1Ltfb9CjQP9udKBoQj2woSsTzZ7jmr4kBGo13+/slO+XyWYb+f7B2Y9o8V83eFwtT/8MgssvZ3nW3afAvo8g7pmsf369ZichlC7FrngWY0cZbD0HJJ21JwjLoMQQMvXuj9d4ynGwLMX4hCmaI09iGs7EatrRvBuaXk18urMX1OTcOI7L6sS28vo9U+PTy0dLAleahGIE9+Xouae3upn0DSbO0S2dXXWrFXLqntbj4BK6oTmGAoo26SOKoAI9/WxZoIWn4SOO4mmiiqQRyBwuCGyY2LxSdevGevwsXMzQzKNwjP71f61SiF9yBmOY+mj2532MS4aplt3PWjKQllAYs3UxS+jVF9Em0lXmvu6TWGktlbIr6kx7Vlpwfytp7mgl24cWH0REQ3Qq5WQ9urlr5LnD7R3QDNNWUvTk4gqkGlWUSpqYkCLmzV3/8mL0SSDpM+MDc+IBIPMv+6Bny0L3KflrKz5wdI2DHCDbtqsCNJUpQNES7QMTeuUqcfcK8z5q0xbVZhNn7x7gG5xog31k+9llFBfwbFr6RRtzuNvuYHO4ivINTaShlcmnmwMViQ9swwikgeqZfq5ekfCJBssVtYg00dCGEpGuck9Y5S46vFvOUj8QwGgkcH1ADOhEaF79cYo+zfAnfE+JXitx3HCcJZOfmDOYR0hvxNJMrlssCrhbhBURFhY/COMZfu3iRWJ0STGwRryxMHPZ5ASeQeTK5a5EBKf5YY4EVpZZ6bO3xDWmy1Xoq2OKaUxHwjS2MssjMA2BRAyiEJZVrmeIaBtLZLlsWcSWgBg894i2WRSrii+XzOI4/67wBs3+AhmgELHaKToLlo/oX7IyoBjv5tgw43MWdQ6zcbfkXHyu9IOZ0wGcJKlPZI9cg8vKaRQiK1p/9VpMSsGpBq7qnRxIW64m+QpDXYpr0JqM+fvQ2JIfm6f8yF6vX4glPunhk/zMb5NlOCH3nBWTJTpi86xUzKGP7UkqTAKcpDAIkVIaEKW1SNKnD6KGuvGfooRsoynuTv0H9O/4KYdSdXbcNxvUavnkH6TuileJINH2AyJPyFKk2Qf5vDhdwGtWwti8+k3QVdOIOHxT4uQJmRB8fO4vwgjL+ycY3UP81Pb1isd7R6YHtCp5SKtfmc32epprK9IvkmKM6PrHAec3SzP/Haaatq2YJodJNVfu9nfPJVmNJnW71T2nSGY3GMxDM5Suw/t5vCku3lxAhUiSnTr/KMA9SvEO7X1KworAePvt53/WRs+2oXhyFK3I1ZsrWkfIzMm3egoKZhgHxAVAUkRunyRRVBa05tjEgQUFiS03rsDHZZLhwyJ+ID76PM+xnVpGK2yVXizZ1ypyp9GMdIGGTQQjx23VtnTd1mzu0dXxEbY4CEzy6fYWBeng5vLu48VngbisYtgSGAgk56vONvQCSxbMe8bYJMDpjZQWxHazDwAqW1uHXzvG7Mo2A7rCql8slhFcQJIcAjer8YhstQ7xuk3x8u4eAbchX9IY+CAiiGBGS0IEV0IEQxURJNlXtPBRmM07TIV/XAYGb0zNbRMKE2gAWJZnuLbnGIDbm6yO9y051vNV+yPPw9ao21Il52MJnFpN54n9UtoOJ/qeswWVLNywBfeMXOxe0w1qUMyWuP13UOdJL87eyVZea8MO67VjZFoWB/ppCSUCND5teiynyBbt8waswjukJ+C38raEUDcNjQuqu0drZqKLsdAn5GF0l8i7StIFCtqwoe+PbgmFk7RImj6E+bz2AnKcoszqsAgFmckk9HMYdOj6iEzuW3WZp94EyjHwY8/GSYp2XGabQzZ45ku7zJYcdPfKG2i4r9pxkNTOX2PLKhGJ+5GUJf1JvsJ4OfCQdZtKlOj//gLLWTzOSLK66MiRIkEkK1el7pG+Kr3+xTtR9u42ZW/x+7LbQTDZumZ5ki11VVMOXVPR+0huy8VCXmUNfgPymlzyua3N/JZmHY2Ckl1Ot3OMLOhm/pSkI/1s3l34S95k/4RP2GB3RQtdGnHyxOoSRokotuIO8Hu6dLuMceY6O6BYDrPurskGWK4n4hHqDsttQ4HlNBY97nUupm5I+MZzYLrt8sVI2TYl2cKrS4KJxrXYEzZeETnJc38yJ04u3eORtHeUZ//fHFEsfl+nqYtEMWT9M9QRRbRZBVEmBYyKqKsKn5VptaS8eVK4HCks3daALoqId0xqSBDulY8ZhPc8gCNb+nE1RjBVlZwgN/snMTZsVplFv617r57UeJRpCKOABmfQX0ENs5OReLwYyJJW9T4/Z76Okm5K5jekvo8OvzuIrTyhLUJPDGZNU8JSbYDL5NXGV1STV9Vk/8jBbHXh9p6RL9o23ZRkopB0Rljw/rjGBVes5y+Hp6QVawmolWDIRnf90eDrRR9ddX73mTp6D4GNubUajzxotkPvgZvvyycbKK5htzzyKkBhoCqGORv6aL03egJNdw122VtquNw1WC7Rdc1V0ROlUSPRAJzvw8Hd7fVXUZ7LJuwUKv6kEn7ck73A06sMSfWxr0NDFpIC4AxPzxWHpHzHZk9io6Vun7KEsyHW99RSwEdBhiFvI3he9spQmhSwPb7gptc/VvRiaX9DTAy8byo4EipIdioemQpiluB9U8GVUMF4cSrI9qS8Zyp4Eiq8eCHSfOFC5H5NoRXt2G0ldKtI+spDN0vcHF/UIetWP0yJge0DtMpyclhgDMZPHQZXIOWot+fmb61H2U6V8D7YsXc1dsOgfrxATXQMKyjpb0BBg2vx1E7BuKv3+Mi7q6R7pfy15T/T8KqxJVwPxVeWjdtdpR/6Cy77YVI9j+PiyofYhDE1gJBFfu4dHnuDIlSqmF8sjBSHkWHNE1VBp0zTMk3uw2AHhNvb1HXeNl3nuNxvj7Rju1yDZSTjaIZLjOJ0eSHiHRLTtjwFtOxSP2RR2a2jAWMqJfvbxIM9STwo20Z03HhQTPPyv4EJTj5eiz1A31x53uWQEj3ZPrqjolcsMSNyCWcwDkg3hE19F0ihXNJlgW/HQYrZG7ts0FV4XeuQXXs8YIm1sqT7EfCXy+fW8hXN2sCzptA+r2JSJp7UunFJhVngIUg1fAI36gynIT7PYh7C9X7lRTbrUEDfV/GeFssyslejO4+EcZdrPnLE+dt4rt9qwFGHArVgHOYa1LKJ8av9rJ/R15W/MYzCpYD1MSnPs/QDyHCFg1H3eg7LcstVNicVuDUevFP9rvF/KKX+ZQVXNXRmXDrME/QYGKicrYtnWzVqqh1z0qqGmi6BHq3bUqubjkemQ0cWqyVpPcYJ7EPJyKs4/Jcs293dxRmxCRn6hvXV+Gcy8XUFZIn8aGZNEOpxVMRykq/RwB9UvmoPv+rwEU6QaU1L3u8YjZ3MJK3TVMxMB3hq59SPi27EkjBzIhDPlIC+AkKx8xQpmfuaGufMC/VHt91qG5TCd8HGlFXlWIVcX18Wd90U0jfjOtMVSVd59Frv3eJfR+l7FOYV5qs0Ltmd4+FCjyicAbGlTXtlhDXJBWA9kTGFcyR28FvDXhyeYeVzeeO+NY98le23s3sy15rPe+/gW2MxTzAV1sEoCgXnV0mAg/7h/wA=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/broadcast_arch.drawio b/plugins/websockets/doc/broadcast_arch.drawio
deleted file mode 100644
index 54bba9c7..00000000
--- a/plugins/websockets/doc/broadcast_arch.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-05-24T11:49:56.412Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="onoEtIi9b01eD9L0oL7y" version="14.5.1" type="device"><diagram id="uVz6nu4iiJ3Jh2FwnUZb" name="Page-1">zZSxboMwEEC/hrES4JAma9I0WSpVYkCdKgdfsVXDIWMK6dfXNEcAoUjtUKUT9rvz2X4ceGybt3vDS/mEArQX+qL12IMXhsE6WrlHR05EAnZ/JplRgtgAYvUJBH2itRJQTRItoraqnMIUiwJSO2HcGGymaW+op7uWPIMZiFOu5zRRwsr+YqE/BA6gMtlvHfoUyXmfTaCSXGAzQmznsa1BtOdR3m5Bd/p6Med1j1eil5MZKOxPFpS71cthFbVJvE+Xr4k0+IZ3VOWD65pu/FwftaokmK64rjNV0PHtqZdisC4EdGV9j20aqSzEJU+7aOP6wDFpc+1mgRvSBmAstFdPHlx8uFYCzMGak0uhBeGCFFIXsTXNm9Er6TXL0dtgxDh1QXYpPXhyA1L1C23hTNvGIBcpr+y/sxUFt7bFZrYSOFaYvsPtbS2WU1uL6O9suenwuX/HRr9NtvsC</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/doc/doc.go b/plugins/websockets/doc/doc.go
new file mode 100644
index 00000000..fc214be8
--- /dev/null
+++ b/plugins/websockets/doc/doc.go
@@ -0,0 +1,27 @@
+package doc
+
+/*
+RPC message structure:
+
+type Msg struct {
+ // Topic message been pushed into.
+ Topics_ []string `json:"topic"`
+
+ // Command (join, leave, headers)
+ Command_ string `json:"command"`
+
+ // Broker (redis, memory)
+ Broker_ string `json:"broker"`
+
+ // Payload to be broadcasted
+ Payload_ []byte `json:"payload"`
+}
+
+1. Topics - string array (slice) with topics to join or leave
+2. Command - string, command to apply on the provided topics
+3. Broker - string, pub-sub broker to use, for the one-node systems might be used `memory` broker or `redis`. For the multi-node -
+`redis` broker should be used.
+4. Payload - raw byte array to send to the subscribers (binary messages).
+
+
+*/
diff --git a/plugins/websockets/doc/ws.drawio b/plugins/websockets/doc/ws.drawio
deleted file mode 100644
index 739b797a..00000000
--- a/plugins/websockets/doc/ws.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-05-19T17:03:39.963Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.5.1 Chrome/89.0.4389.128 Electron/12.0.7 Safari/537.36" etag="NPVoySJeOY6GMsZ1pcPw" version="14.5.1" type="device"><diagram id="WuhFehjWL4AdMcIrMOFQ" name="Page-1">7Vtbc5s6EP41nkkf7BEiYPyYW93O9MzJ1NNJe95koxg1GPkIEdv59UcCcZEBBycmkJy+JGiRhNjVft/uIg/Mq9V2ytDa+4u62B9A4G4H5vUAQsO0gPgnJbtEMnbMRLBkxFWdcsGMPGElVOOWEXFxqHXklPqcrHXhggYBXnBNhhijG73bPfX1p67REpcEswXyy9I74nJPSScQ5De+YLL00kdDoO6sUNpbCUIPuXRTEJk3A/OKUcqTq9X2CvtSe6liknGfa+5mK2M44E0G3PHp/aP73faf/ub/XD9Mh9MoGiprhHyXvjF2hQJUkzLu0SUNkH+TSy8ZjQIXy1mBaOV9vlG6FkJDCH9jznfKmijiVIg8vvLVXbwl/KccPrJU61fhzvVWzRw3dmkj4GxXGCSbv4r38mFxKx2XvJ98qVq1KVFII7bAB3SV7j/Elpgf6Acz4wq3wHSFxXrEOIZ9xMmjvg6k9ucy65dbUFwoIx5h0PNk3kfkR+pJIRLPzDyE0KDS5N/QXLiuZibkk2UgrhdCU5gJwSNmnAjfuFA3VsR1kx2BQ/KE5vF8UulrSgIev5l1ObCuMzPICfB2UOG4anDuLUUD1e/cspbV7EMwEp6odndjzavpbuXyC3OdW8kQhWGTkbk3B72/D8WW2LddtqyXm9MqmdP4n5gPWNZYUzt8nTFTAhpNxm9lPKNkvDs8D+niQT4OhNE8XDAyF7bZt6iOsBuPcDxboxicNoJndTvX2qaEcbXqhudAU3XK2psC5ZlK5hXYLh12chSDJc0NoO2Lp1665FFcLnn84okoXKMglU19Okdy+hmnTHK76iNWUexWEFdMGIsK9rD/jSRJXwoF86FyqwvRw8f3PL97aFlD0fvHj69iLiAvH/CuRwu7KjBDvLxE7cctsGc7WERoI6vZJrba2sTOn9iqcWxlN4ytxl3GVpMSKkmUwfErc8Gx0n/OZimsx+y4WqHA/fQxKNt5jrKNMTA0Lxy+MgBrn6Lt90LR1qRnFG1UB6J/4K0WthrAmwGqN8FxDnTBGNoVOihwqE1wbEtPcEx7r5JwXH9xkazgtMF0OSYso++GCGeQLy5cmCAudtrHS3yNw0AhU6eJPX4d8GaZ72gMdUQ3xPSmPk976Dwumfw7dknYAzDWHaASjWEFGpttobHZbSGvgMU5Mj+DxoaGxTk016AxDtwLWdCVfukLByeLRPiZ+LplT1PrAw0R2+y02pcus+AhwyIgJrnanBXTvSIegigS87/KnVKwjHPOVrzLdCpzuSr/ai/aKRfiehooGmCvXNl5pAjH/YkUwTPY9KYY4zTEGGh3ijFOT1nYAJaeE5mgMVC0RsTQ6nKzG4MXEHHWOJqIAxpgzUNO/cVt0tBDjJraxBt5SLkw9P5ZeAL2natpjJs54emdC74fJqmuORjPONcJnQc2DWHrNsLbOA8sh7C3X25P4w1ZAt8G21hGD9imR+c3+u8QTQ9wdMsmsHyEYxonGfIDYih3m0cWcZUrLn5ptS5V/8qY5yPUu+DhowJgZDj6QQH1wP5+dEgXU4S8aO6T0Is/3+6HC+1+/eV0LdIcCM7UpvnUoy/TLuKoR8uZM/ogM3xwtgmPVVP/6MzW6ewcVtIZqKAzuzU6m3RKZy9JnnpdxYRNP6vDThkvDY8+bhTTU7MnVc3OzF5xPo4JIMyAX3CT/ONJyd1MS5rL1ad3GduYNd97s9gGjh29mjvsf3RTPlORncrTqh5hekSvf9y4l+rVnCZ721SvXIbV3EEEcIxy0cQV5406V6mzd3zFcCYj02moUnsE26oombCk1Vlft+U53KvItfpdTDTz36Yk2JD/xMe8+Q8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 048a41ed..8db6d9c3 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -39,13 +39,13 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St
}
}
-func (e *Executor) StartCommandLoop() error {
+func (e *Executor) StartCommandLoop() error { //nolint:gocognit
const op = errors.Op("executor_command_loop")
for {
mt, data, err := e.conn.Read()
if err != nil {
if mt == -1 {
- e.log.Error("socket was closed", "error", err, "message type", mt)
+ e.log.Info("socket was closed", "reason", err, "message type", mt)
return nil
}
@@ -83,10 +83,15 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
- if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
- continue
+ // subscribe to the topic
+ if br, ok := e.pubsub[msg.Broker()]; ok {
+ err = br.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ // in case of error, unsubscribe connection from the dead topics
+ _ = br.Unsubscribe(msg.Topics()...)
+ continue
+ }
}
// handle leave
@@ -105,18 +110,20 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
- if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
- continue
- }
-
err = e.conn.Write(websocket.BinaryMessage, packet)
if err != nil {
e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
continue
}
+ if br, ok := e.pubsub[msg.Broker()]; ok {
+ err = br.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+ }
+
case commands.Headers:
default:
diff --git a/plugins/websockets/storage/storage_test.go b/plugins/websockets/storage/storage_test.go
new file mode 100644
index 00000000..77d3180a
--- /dev/null
+++ b/plugins/websockets/storage/storage_test.go
@@ -0,0 +1,7 @@
+package storage
+
+import "testing"
+
+func TestNewStorage(t *testing.T) {
+
+}
diff --git a/tests/plugins/websockets/configs/.rr-websockets-init.yaml b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
index 9973b2dc..1b60d3e7 100644
--- a/tests/plugins/websockets/configs/.rr-websockets-init.yaml
+++ b/tests/plugins/websockets/configs/.rr-websockets-init.yaml
@@ -1,7 +1,6 @@
rpc:
listen: tcp://127.0.0.1:6001
-
server:
command: "php ../../psr-worker-bench.php"
user: ""
@@ -10,11 +9,9 @@ server:
relay_timeout: "20s"
http:
- address: 127.0.0.1:15395
+ address: 127.0.0.1:11111
max_request_size: 1024
- middleware: ["websockets"]
- uploads:
- forbid: [ ".php", ".exe", ".bat" ]
+ middleware: [ "websockets" ]
trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ]
pool:
num_workers: 2
@@ -30,16 +27,9 @@ websockets:
# pubsubs should implement PubSub interface to be collected via endure.Collects
# also, they should implement RPC methods to publish data into them
# pubsubs might use general config section or its own
-
pubsubs: [ "redis" ]
-
- # sample of the own config section for the redis pubsub driver
-
- redis:
- addrs:
- - localhost:1111
- # path used as websockets path
path: "/ws"
+
logs:
mode: development
level: debug
diff --git a/tests/plugins/websockets/websocket_plugin_test.go b/tests/plugins/websockets/websocket_plugin_test.go
index a9b90fd0..97577be8 100644
--- a/tests/plugins/websockets/websocket_plugin_test.go
+++ b/tests/plugins/websockets/websocket_plugin_test.go
@@ -1,6 +1,8 @@
package websockets
import (
+ "net/http"
+ "net/url"
"os"
"os/signal"
"sync"
@@ -8,14 +10,18 @@ import (
"testing"
"time"
+ "github.com/fasthttp/websocket"
+ json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/roadrunner/v2/plugins/config"
httpPlugin "github.com/spiral/roadrunner/v2/plugins/http"
+ "github.com/spiral/roadrunner/v2/plugins/kv/drivers/memory"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/redis"
rpcPlugin "github.com/spiral/roadrunner/v2/plugins/rpc"
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets"
+ "github.com/spiral/roadrunner/v2/utils"
"github.com/stretchr/testify/assert"
)
@@ -36,6 +42,7 @@ func TestBroadcastInit(t *testing.T) {
&redis.Plugin{},
&websockets.Plugin{},
&httpPlugin.Plugin{},
+ &memory.Plugin{},
)
assert.NoError(t, err)
@@ -84,19 +91,65 @@ func TestBroadcastInit(t *testing.T) {
}
}()
- time.Sleep(time.Second * 1000)
- t.Run("test1", test1)
- t.Run("test2", test2)
+ time.Sleep(time.Second * 1)
+ t.Run("TestWSInit", wsInit)
stopCh <- struct{}{}
wg.Wait()
}
-func test1(t *testing.T) {
+type Msg struct {
+ // Topic message been pushed into.
+ T []string `json:"topic"`
+ // Command (join, leave, headers)
+ C string `json:"command"`
+
+ // Broker (redis, memory)
+ B string `json:"broker"`
+
+ // Payload to be broadcasted
+ P []byte `json:"payload"`
}
-func test2(t *testing.T) {
+func wsInit(t *testing.T) {
+ da := websocket.Dialer{
+ Proxy: http.ProxyFromEnvironment,
+ HandshakeTimeout: time.Second * 20,
+ }
+
+ connURL := url.URL{Scheme: "ws", Host: "localhost:11111", Path: "/ws"}
+
+ c, resp, err := da.Dial(connURL.String(), nil)
+ assert.NoError(t, err)
+
+ defer func() {
+ _ = resp.Body.Close()
+ }()
+
+ m := &Msg{
+ T: []string{"foo", "foo2"},
+ C: "join",
+ B: "memory",
+ P: []byte("hello websockets"),
+ }
+ d, err := json.Marshal(m)
+ if err != nil {
+ panic(err)
+ }
+
+ err = c.WriteMessage(websocket.BinaryMessage, d)
+ assert.NoError(t, err)
+
+ _, msg, err := c.ReadMessage()
+ retMsg := utils.AsString(msg)
+ assert.NoError(t, err)
+
+ // subscription done
+ assert.Equal(t, `{"topic":"@join","payload":["foo","foo2"]}`, retMsg)
+
+ err = c.WriteControl(websocket.CloseMessage, nil, time.Time{})
+ assert.NoError(t, err)
}