diff options
-rw-r--r-- | plugins/broadcast/doc/broadcast_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 4 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 25 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 2 | ||||
-rw-r--r-- | tests/plugins/broadcast/broadcast_plugin_test.go | 65 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml | 33 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-init.yaml | 10 | ||||
-rw-r--r-- | tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml | 29 |
8 files changed, 144 insertions, 26 deletions
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio index b2ee091a..fd5ff1f9 100644 --- a/plugins/broadcast/doc/broadcast_arch.drawio +++ b/plugins/broadcast/doc/broadcast_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-17T18:52:07.846Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="eUG-4GjLSDkFvWQsKkB7" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5u4Gv41nml3xhkkcf0YO222M5vTnHg7bT+dwUa22WLLCziX/fVHAoFBkmMcrsmmlwSEJOC9v48ujNB083gdurv1DfFwMIKa9zhCVyMIoWZC+ouVPKUlCAIjLVmFvpeWgUPBzP8H80KNl+59D0elijEhQezvyoULst3iRVwqc8OQPJSrLUlQvuvOXWGpYLZwA7n0u+/F6+zFNO1w4Xfsr9axeGXjZrV5QbR2PfJQKEKfRmgaEhKnR5vHKQ4Y+TLCpO0+H7maP1mIt3GVBlPdNL9Mrrdj6wcaz7b36x/kemzqaTf3brDnr8yfNn7KaBCS/dbDrBdthCYPaz/Gs527YFcfKNtp2TreBPQM0MOlHwRTEpAwaYs8F9vLBS2P4pD8woUr5sLG8yW9Ir8Hf7V7HMb4sVDE3+sakw2OwydahV9FNqcxFzPd4OcPJZ6lZesiuxxe6HI5WeV9HyhJDzgxzyCsYbZL2KXB/ioJm/xhLcg2LpSnf5ohONBAmeKWJVNctxUUR6bRFsWNt01xWKb4OBfoHknesvXom+SCkCPN6JvijkRf7FGHxU9JGK/Jimzd4NOhdFLmwKHOH4TsON3/wnH8xL2vu49JmSv40Y9/sOYXBj/7yTtjx1ePxZOn7GRLX7fQiJ3+zPpjJ4dmyVnW7ijfIrIPF/gZ0mThgRuucPxcPc4wRrhnxSDEgRv79+VIQMVR3vSW+PSZc/GxoHYBdGgbFv9ZkiVDEzxP+ty8D0FK8od6ueBkEtmX5MCi6ICKolMUnIIcHREd0XjYC7xQhh5z22ARQhfClsVXDQvbZRi6T4UKOyY20XFZNG2jJH2mLsSKYn1Qr74Nnq+PNK1WfcMxBCVJKdKoymQO4+DcJndfL6+ml7M/m/VyNQS1ujczDYGEiogN6ApnZrQVIut9WiRQMEcHv3aeQYLnWqTlEqoZ7Zlz0zBrWSRY1f01bZBqyQCUdOzu09WXGS26/TaZfZs0q2hLbB6hv+XMtYYUTRcC9TxCLCoa7FLRenX9TSjakPQMvUo9Q5Ke3Xy6+Xr38y0pmtKjdapo1mtXNGtImma+Sk2Tcb/Lm//ejtBn9u8NaZtp9q1ttkRp+uoBGwWgN0uau/euH7hzSmT68vt5tJ/TAy+kMhBGEgsoHeIynd3AX23p8YJSDVOSThi1/IUbXPILG9/zUiXGkf9PcqOUgTwfo/0ak5FxxfqiehulKgwkVm3JFreDhBu6zCU1Et4WlwDq0ygOHLLSqpq4Ydm47LmLqXFIXG/hRkz5fKYuS2a9mjRznoFtT1eZORvOkfmsrzkjTTZfGL3rrSmQ+a5Ap/TitAIZTSvQi3A4S3Ci2fjjMdzLsexa9c0T9QFwtOcatAOsGbBfibaqi3QHoqk3bttfJpoWKkkCsM8UHaFBS6LTazRRTrGsqjnW4KwhdAYhcrYgQTqPFY+JnFgfAONcGRVatDRwII8cjKAZsOxiV5Jd8+89m/0z2VCG+TShuKRXtd0j/ZnIgZaWj2MmpuyaXrjG0pUxz1DYNZ6k5H3SoxX/7W5YZBXIZ4VAjReXm2RnSeV5WOgyLckK8uAveQB+o89zsT4t24lla6nX5kjErwV4GZ+mX1JLot41jq+SfPHDL/yUPFjob1cf6dFsP7/Drld648LbCUYqWrs7dhgmk9SKtoXQzpdBMjGMBbxHwuIGcZczYmFhVA8pUv58pl4xFrZai4Xf5z+cDHFPW/+OJkBI1tcQoaCW5zwAeUbYdzyPyOIXjmX8p05uOgeet9RU+gg0CzkNgTsAIBHxti7kKUnK9BS1NSMJWBKRv2z9+AOD2nK/8FGi9ptA26RJeSZUoAXdwm0yKnqHF5hpMdTmlUCaN8oaBRIKzC5Zk1vy15S7FH1XhfGhl7uvjOyn3RfsyH1BEUtH2gUD2B2D/6zkzM5NikAePQkQ/tEkR25gC7LaQo4De529M+xIrLoot4NKKiOxC6AhXbehZdgmLEM9urgKouW4LKNPwUd9fdim/ukXS6i0iKZJPtkmbmqUrO5Z+qs3EbEBhRfqdDgB9jseV/JCsKIXAmUvdHLeXQ3VhVVVt/Ekqh5XoaRSGS7h+fcZLPFwyH4Y5pHjFoUqilY5HLSdR7sUPOGKmgAnHl66+yCu050bBOQBe/8joZ+DPAcs5rcSLPPim+ySBXnlrmk/D9Hp7uvYnSx2TlGonswQElay6I4ibVRN/m0NyYHvo5onw4LTRsjqKn6wnYvDQhY9u2+GC3YdP8i4To40XBwQ29xIcQvFUNuxiG0XcdzXF15owqR+HcnhBVSuUGtNr2U4aEbFgQkTfsQLqpGU0NNEJF2P/trgKHJXmBphLWBa/AqZIMZ4SIU0dBvj2e/G9ShtnIrGFbWzdKvCIGW+7rQre+pIOvs1XieJ2G2wXyUpmIxdDX8ul6SZNAXuHzBHcvb77wXMgeYojGWniDmSR+vfEfNUfVRgRaeQOYJ9OrKhQ+aZhzrtysyOXJkImTtOL5A5yHbhqYyZ0xYdgObofS58A8JsdxeXPQOaOx1PZsjo0ydo7mkLjKEqbDNMA1htJVRO1YV9rSVUqN/dM0p+aHigOaq6Uq/5lKoeV1V7dElQcpoKNYeXxziKYY3Ojs3G7A607sgMiKC1Y/QNWqNeF/gO3H/bVY1AO5PPzwWt7a79tzwxa2igdUd6LYLWtgIC6Ba0RjIANsPhfcPrF/sJnmzFNoHdBk9Zx+9W8/jir9OrxLpK4UU0WhSLls2kLqNxZSxaRoRq7Z3geo6n3GHSxTpAsCWlNPuHoXUoEfpfBEOL8E/v87Z1eYeedxS6FOH3BkLrA9qobnggtF41+de7wu1EEBqWMWhg65U8Wm0UGp2JQXPXd7S+NImrVL8dxFrvedvYQcduVSU/y3d6RqzHooVsO5ST4bXOEeuu47v+kqx+oamBI9R6VXCq+TSrHldl0OgYQp1mR9NUhZpFq8E5nfWOTnek8qI3zlcbn0SnW8vo3hfHn3TBp311O7vxnItOj0HHzjoj0IDh6Y4UW4Snx4od4LqFpw0ZE2senu4JCRuD3gf3zTc/K6fLL2MYVkVLO6wtLQ153cLd7bSMtx2LbaKdu83Kcqju4nY/D/xo/YGvcPhYiISKDRpVYgw8A1sqzjqmhdym9gVC5X2BLNUQk9WpCve6tcJZKkxPbnHo0/dmfrSvcKhqPmQ0vsKsnpbK+dA+UoQZ2XZb+01wuWBLjg7Q9x/uHAe3JPIT5AFdzUkck81IxsaTLcVKO3Xt48DfUqXKPjX5LA+qq5OlCR/PUXy9ECm0yWxNm2TgZoa33ihfrcUkh7A70//czCmY8CbGI5AA0lqq/YBVtq614QhT9lS3Ibn3PcaW0k58KZNIafj0je57bgofiIJVt6prgk0T6/Y/3/+cgL+nd5vV1PxBfp99U3096jCKV8flHyGNgoBHqWUIm1wD3aw4HN2EB1eSC0rkSjPOxLWsfCo3LoeKo8Uab9zBinEtxojD0nqXK3CUfBnQ2Gc73w5SxjcnoyolrRSfBjpuGjoIoZ57yIF9cauW1ogDvtScVcQUmvg2iZLK8g4F/NskUXo3Su0Z99rDDZxqMUUHIlMUYa0SIG/NlslhbZEJ2oIl5douj6W8zAHJmf/cjTCLhxOHROOrQztWmo5j7nOPlWwdXJnHfFRj+ByW1E61uZUyc3kBUkpPQ8I26TmA3tT4rG+Ih1mN/wM=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 612b6a47..3b420a4b 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -74,8 +74,8 @@ func (p *Plugin) Serve() chan error { return errCh } default: - p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") - continue + errCh <- errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) + return errCh } // config key for the particular sub-driver kv.memcached diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 0583be0c..664b4dfd 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,6 +22,7 @@ type Response struct { type Executor struct { sync.Mutex + // raw ws connection conn *connection.Connection log logger.Logger @@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit err = json.Unmarshal(data, msg) if err != nil { - e.log.Error("error unmarshal message", "error", err) + e.log.Error("unmarshal message", "error", err) continue } // nil message, continue if msg == nil { - e.log.Warn("get nil message, skipping") + e.log.Warn("nil message, skipping") continue } switch msg.Command { // handle leave case commands.Join: - e.log.Debug("get join command", "msg", msg) + e.log.Debug("received join command", "msg", msg) val, err := e.accessValidator(e.req, msg.Topics...) if err != nil { @@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, errJ := json.Marshal(resp) if errJ != nil { - e.log.Error("error marshal the body", "error", errJ) + e.log.Error("marshal the body", "error", errJ) return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) } errW := e.conn.Write(packet) if errW != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", errW) + e.log.Error("write payload to the connection", "payload", packet, "error", errW) return errors.E(op, fmt.Errorf("%v,%v", err, errW)) } @@ -115,13 +116,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } @@ -133,7 +134,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit // handle leave case commands.Leave: - e.log.Debug("get leave command", "msg", msg) + e.log.Debug("received leave command", "msg", msg) // prepare response resp := &Response{ @@ -143,13 +144,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } @@ -170,7 +171,7 @@ func (e *Executor) Set(topics []string) error { // associate connection with topics err := e.sub.Subscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics _ = e.sub.Unsubscribe(e.connID, topics...) return err @@ -188,7 +189,7 @@ func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) return err } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index f0b7c6c3..ca5f2f59 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error { p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log) - // run all pubsubs drivers + // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { select { diff --git a/tests/plugins/broadcast/broadcast_plugin_test.go b/tests/plugins/broadcast/broadcast_plugin_test.go index 585a81a9..ce1aed45 100644 --- a/tests/plugins/broadcast/broadcast_plugin_test.go +++ b/tests/plugins/broadcast/broadcast_plugin_test.go @@ -92,3 +92,68 @@ func TestBroadcastInit(t *testing.T) { wg.Wait() } + +func TestBroadcastConfigError(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-config-error.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + _, err = cont.Serve() + assert.Error(t, err) +} + +func TestBroadcastNoConfig(t *testing.T) { + cont, err := endure.NewContainer(nil, endure.SetLogLevel(endure.ErrorLevel)) + assert.NoError(t, err) + + cfg := &config.Viper{ + Path: "configs/.rr-broadcast-no-config.yaml", + Prefix: "rr", + } + + err = cont.RegisterAll( + cfg, + &broadcast.Plugin{}, + &rpcPlugin.Plugin{}, + &logger.ZapLogger{}, + &server.Plugin{}, + &redis.Plugin{}, + &websockets.Plugin{}, + &httpPlugin.Plugin{}, + &memory.Plugin{}, + ) + + assert.NoError(t, err) + + err = cont.Init() + if err != nil { + t.Fatal(err) + } + + // should be just disabled + _, err = cont.Serve() + assert.NoError(t, err) +} diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml new file mode 100644 index 00000000..b01dad1e --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-config-error.yaml @@ -0,0 +1,33 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +broadcast: + default: + driver: redis + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml index 8436b65f..aa80330e 100644 --- a/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml +++ b/tests/plugins/broadcast/configs/.rr-broadcast-init.yaml @@ -19,22 +19,12 @@ http: allocate_timeout: 60s destroy_timeout: 60s -redis: - addrs: - - "localhost:6379" - - broadcast: default: driver: redis addrs: - "localhost:6379" -websockets: - broker: default - allowed_origin: "*" - path: "/ws" - logs: mode: development level: error diff --git a/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml new file mode 100644 index 00000000..d324284d --- /dev/null +++ b/tests/plugins/broadcast/configs/.rr-broadcast-no-config.yaml @@ -0,0 +1,29 @@ +rpc: + listen: tcp://127.0.0.1:6001 + +server: + command: "php ../../psr-worker-bench.php" + user: "" + group: "" + relay: "pipes" + relay_timeout: "20s" + +http: + address: 127.0.0.1:21345 + max_request_size: 1024 + middleware: [ "websockets" ] + trusted_subnets: [ "10.0.0.0/8", "127.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "::1/128", "fc00::/7", "fe80::/10" ] + pool: + num_workers: 2 + max_jobs: 0 + allocate_timeout: 60s + destroy_timeout: 60s + +logs: + mode: development + level: error + +endure: + grace_period: 120s + print_graph: false + log_level: error |