summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
commitfe7bb0fe758d573fe353df028257ed66c6eccf66 (patch)
tree74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 /plugins
parent68ff941c4226074206ceed9c30bd95317aa0e9fc (diff)
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/config.go6
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio2
-rw-r--r--plugins/broadcast/plugin.go135
-rw-r--r--plugins/kv/plugin.go5
-rw-r--r--plugins/memory/plugin.go2
-rw-r--r--plugins/memory/pubsub.go2
-rw-r--r--plugins/redis/plugin.go2
-rw-r--r--plugins/redis/pubsub.go7
-rw-r--r--plugins/websockets/config.go16
-rw-r--r--plugins/websockets/executor/executor.go41
-rw-r--r--plugins/websockets/origin_test.go9
-rw-r--r--plugins/websockets/plugin.go140
-rw-r--r--plugins/websockets/pool/workers_pool.go20
13 files changed, 220 insertions, 167 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
index 18846f30..4f1e5213 100644
--- a/plugins/broadcast/config.go
+++ b/plugins/broadcast/config.go
@@ -1,6 +1,9 @@
package broadcast
/*
+
+# Global redis config (priority - 2)
+
websockets: # <----- one of possible subscribers
path: /ws
broker: default # <------ broadcast broker to use --------------- |
@@ -8,9 +11,12 @@ websockets: # <----- one of possible subscribers
broadcast: # <-------- broadcast entry point plugin |
default: # <----------------------------------------------------- |
driver: redis
+ # local redis config (priority - 1)
test:
driver: memory
+
+priority local -> global
*/
// Config ...
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
index b8d2947e..b2ee091a 100644
--- a/plugins/broadcast/doc/broadcast_arch.drawio
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -1 +1 @@
-<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile> \ No newline at end of file
+<mxfile host="Electron" modified="2021-06-17T18:52:07.846Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="eUG-4GjLSDkFvWQsKkB7" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5u4Gv41nml3xhkkcf0YO222M5vTnHg7bT+dwUa22WLLCziX/fVHAoFBkmMcrsmmlwSEJOC9v48ujNB083gdurv1DfFwMIKa9zhCVyMIoWZC+ouVPKUlCAIjLVmFvpeWgUPBzP8H80KNl+59D0elijEhQezvyoULst3iRVwqc8OQPJSrLUlQvuvOXWGpYLZwA7n0u+/F6+zFNO1w4Xfsr9axeGXjZrV5QbR2PfJQKEKfRmgaEhKnR5vHKQ4Y+TLCpO0+H7maP1mIt3GVBlPdNL9Mrrdj6wcaz7b36x/kemzqaTf3brDnr8yfNn7KaBCS/dbDrBdthCYPaz/Gs527YFcfKNtp2TreBPQM0MOlHwRTEpAwaYs8F9vLBS2P4pD8woUr5sLG8yW9Ir8Hf7V7HMb4sVDE3+sakw2OwydahV9FNqcxFzPd4OcPJZ6lZesiuxxe6HI5WeV9HyhJDzgxzyCsYbZL2KXB/ioJm/xhLcg2LpSnf5ohONBAmeKWJVNctxUUR6bRFsWNt01xWKb4OBfoHknesvXom+SCkCPN6JvijkRf7FGHxU9JGK/Jimzd4NOhdFLmwKHOH4TsON3/wnH8xL2vu49JmSv40Y9/sOYXBj/7yTtjx1ePxZOn7GRLX7fQiJ3+zPpjJ4dmyVnW7ijfIrIPF/gZ0mThgRuucPxcPc4wRrhnxSDEgRv79+VIQMVR3vSW+PSZc/GxoHYBdGgbFv9ZkiVDEzxP+ty8D0FK8od6ueBkEtmX5MCi6ICKolMUnIIcHREd0XjYC7xQhh5z22ARQhfClsVXDQvbZRi6T4UKOyY20XFZNG2jJH2mLsSKYn1Qr74Nnq+PNK1WfcMxBCVJKdKoymQO4+DcJndfL6+ml7M/m/VyNQS1ujczDYGEiogN6ApnZrQVIut9WiRQMEcHv3aeQYLnWqTlEqoZ7Zlz0zBrWSRY1f01bZBqyQCUdOzu09WXGS26/TaZfZs0q2hLbB6hv+XMtYYUTRcC9TxCLCoa7FLRenX9TSjakPQMvUo9Q5Ke3Xy6+Xr38y0pmtKjdapo1mtXNGtImma+Sk2Tcb/Lm//ejtBn9u8NaZtp9q1ttkRp+uoBGwWgN0uau/euH7hzSmT68vt5tJ/TAy+kMhBGEgsoHeIynd3AX23p8YJSDVOSThi1/IUbXPILG9/zUiXGkf9PcqOUgTwfo/0ak5FxxfqiehulKgwkVm3JFreDhBu6zCU1Et4WlwDq0ygOHLLSqpq4Ydm47LmLqXFIXG/hRkz5fKYuS2a9mjRznoFtT1eZORvOkfmsrzkjTTZfGL3rrSmQ+a5Ap/TitAIZTSvQi3A4S3Ci2fjjMdzLsexa9c0T9QFwtOcatAOsGbBfibaqi3QHoqk3bttfJpoWKkkCsM8UHaFBS6LTazRRTrGsqjnW4KwhdAYhcrYgQTqPFY+JnFgfAONcGRVatDRwII8cjKAZsOxiV5Jd8+89m/0z2VCG+TShuKRXtd0j/ZnIgZaWj2MmpuyaXrjG0pUxz1DYNZ6k5H3SoxX/7W5YZBXIZ4VAjReXm2RnSeV5WOgyLckK8uAveQB+o89zsT4t24lla6nX5kjErwV4GZ+mX1JLot41jq+SfPHDL/yUPFjob1cf6dFsP7/Drld648LbCUYqWrs7dhgmk9SKtoXQzpdBMjGMBbxHwuIGcZczYmFhVA8pUv58pl4xFrZai4Xf5z+cDHFPW/+OJkBI1tcQoaCW5zwAeUbYdzyPyOIXjmX8p05uOgeet9RU+gg0CzkNgTsAIBHxti7kKUnK9BS1NSMJWBKRv2z9+AOD2nK/8FGi9ptA26RJeSZUoAXdwm0yKnqHF5hpMdTmlUCaN8oaBRIKzC5Zk1vy15S7FH1XhfGhl7uvjOyn3RfsyH1BEUtH2gUD2B2D/6zkzM5NikAePQkQ/tEkR25gC7LaQo4De529M+xIrLoot4NKKiOxC6AhXbehZdgmLEM9urgKouW4LKNPwUd9fdim/ukXS6i0iKZJPtkmbmqUrO5Z+qs3EbEBhRfqdDgB9jseV/JCsKIXAmUvdHLeXQ3VhVVVt/Ekqh5XoaRSGS7h+fcZLPFwyH4Y5pHjFoUqilY5HLSdR7sUPOGKmgAnHl66+yCu050bBOQBe/8joZ+DPAcs5rcSLPPim+ySBXnlrmk/D9Hp7uvYnSx2TlGonswQElay6I4ibVRN/m0NyYHvo5onw4LTRsjqKn6wnYvDQhY9u2+GC3YdP8i4To40XBwQ29xIcQvFUNuxiG0XcdzXF15owqR+HcnhBVSuUGtNr2U4aEbFgQkTfsQLqpGU0NNEJF2P/trgKHJXmBphLWBa/AqZIMZ4SIU0dBvj2e/G9ShtnIrGFbWzdKvCIGW+7rQre+pIOvs1XieJ2G2wXyUpmIxdDX8ul6SZNAXuHzBHcvb77wXMgeYojGWniDmSR+vfEfNUfVRgRaeQOYJ9OrKhQ+aZhzrtysyOXJkImTtOL5A5yHbhqYyZ0xYdgObofS58A8JsdxeXPQOaOx1PZsjo0ydo7mkLjKEqbDNMA1htJVRO1YV9rSVUqN/dM0p+aHigOaq6Uq/5lKoeV1V7dElQcpoKNYeXxziKYY3Ojs3G7A607sgMiKC1Y/QNWqNeF/gO3H/bVY1AO5PPzwWt7a79tzwxa2igdUd6LYLWtgIC6Ba0RjIANsPhfcPrF/sJnmzFNoHdBk9Zx+9W8/jir9OrxLpK4UU0WhSLls2kLqNxZSxaRoRq7Z3geo6n3GHSxTpAsCWlNPuHoXUoEfpfBEOL8E/v87Z1eYeedxS6FOH3BkLrA9qobnggtF41+de7wu1EEBqWMWhg65U8Wm0UGp2JQXPXd7S+NImrVL8dxFrvedvYQcduVSU/y3d6RqzHooVsO5ST4bXOEeuu47v+kqx+oamBI9R6VXCq+TSrHldl0OgYQp1mR9NUhZpFq8E5nfWOTnek8qI3zlcbn0SnW8vo3hfHn3TBp311O7vxnItOj0HHzjoj0IDh6Y4UW4Snx4od4LqFpw0ZE2senu4JCRuD3gf3zTc/K6fLL2MYVkVLO6wtLQ153cLd7bSMtx2LbaKdu83Kcqju4nY/D/xo/YGvcPhYiISKDRpVYgw8A1sqzjqmhdym9gVC5X2BLNUQk9WpCve6tcJZKkxPbnHo0/dmfrSvcKhqPmQ0vsKsnpbK+dA+UoQZ2XZb+01wuWBLjg7Q9x/uHAe3JPIT5AFdzUkck81IxsaTLcVKO3Xt48DfUqXKPjX5LA+qq5OlCR/PUXy9ECm0yWxNm2TgZoa33ihfrcUkh7A70//czCmY8CbGI5AA0lqq/YBVtq614QhT9lS3Ibn3PcaW0k58KZNIafj0je57bgofiIJVt6prgk0T6/Y/3/+cgL+nd5vV1PxBfp99U3096jCKV8flHyGNgoBHqWUIm1wD3aw4HN2EB1eSC0rkSjPOxLWsfCo3LoeKo8Uab9zBinEtxojD0nqXK3CUfBnQ2Gc73w5SxjcnoyolrRSfBjpuGjoIoZ57yIF9cauW1ogDvtScVcQUmvg2iZLK8g4F/NskUXo3Su0Z99rDDZxqMUUHIlMUYa0SIG/NlslhbZEJ2oIl5douj6W8zAHJmf/cjTCLhxOHROOrQztWmo5j7nOPlWwdXJnHfFRj+ByW1E61uZUyc3kBUkpPQ8I26TmA3tT4rG+Ih1mN/wM=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3b771746..c43b2e4c 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -1,25 +1,37 @@
package broadcast
import (
+ "fmt"
"sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
-const PluginName string = "broadcast"
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
type Plugin struct {
sync.RWMutex
- log logger.Logger
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// publishers implement Publisher interface
// and able to receive a payload
- publishers map[string]pubsub.Publisher
+ publishers map[string]pubsub.PubSub
+ providers map[string]pubsub.PSProvider
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
@@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.providers = make(map[string]pubsub.PSProvider)
- p.publishers = make(map[string]pubsub.Publisher)
p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error, 1)
+
+ // iterate over config
+ for k, v := range p.cfg.Data {
+ if v == nil {
+ continue
+ }
+
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+
+ switch v.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.providers[memory]; !ok {
+ p.log.Warn("no memory drivers registered", "registered", p.publishers)
+ continue
+ }
+ ps, err := p.providers[memory].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case redis:
+ if _, ok := p.providers[redis]; !ok {
+ p.log.Warn("no redis drivers registered", "registered", p.publishers)
+ continue
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ continue
+ }
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
return nil
}
@@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
- p.publishers[name.Name()] = subscriber
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) {
+ // key redis, value - interface
+ p.providers[name.Name()] = subscriber
}
// Publish is an entry point to the websocket PUBSUB
@@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) {
// Get payload
for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.publishers[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ p.publishers[j].PublishAsync(m)
}
- } else {
- p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ return
}
+ p.log.Warn("no publishers registered")
}
}()
}
-func (p *Plugin) GetDriver(key string) pubsub.SubReader {
- println(key)
- return nil
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
+ const op = errors.Op("broadcast_plugin_get_driver")
+ // key - driver, default for example
+ // we should find `default` in the collected pubsubs providers
+ if pub, ok := p.publishers[key]; ok {
+ return pub, nil
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
}
func (p *Plugin) RPC() interface{} {
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index efe92252..716e0d4c 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -85,6 +85,11 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
when user requests for example boltdb-south, we should provide that particular preconfigured storage
*/
for k, v := range p.cfg.Data {
+ // for example if the key not properly formatted (yaml)
+ if v == nil {
+ continue
+ }
+
if _, ok := v.(map[string]interface{})[driver]; !ok {
errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
return errCh
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index 6151ebf0..d4d535bf 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,7 +2,7 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 75cd9d24..02246a8f 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -4,8 +4,8 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 24c21b55..8d997041 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,7 +5,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index dbda7ea4..c2a88abe 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,8 +6,8 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
@@ -62,6 +62,11 @@ func NewPubSubDriver(log logger.Logger, key string, cfgPlugin config.Configurer,
MasterName: ps.cfg.MasterName,
})
+ statusCmd := ps.universalClient.Ping(context.Background())
+ if statusCmd.Err() != nil {
+ return nil, statusCmd.Err()
+ }
+
ps.fanin = newFanIn(ps.universalClient, log)
ps.stop()
diff --git a/plugins/websockets/config.go b/plugins/websockets/config.go
index b1d5d0a8..933a12e0 100644
--- a/plugins/websockets/config.go
+++ b/plugins/websockets/config.go
@@ -4,6 +4,7 @@ import (
"strings"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pool"
)
@@ -17,9 +18,9 @@ websockets:
// Config represents configuration for the ws plugin
type Config struct {
// http path for the websocket
- Path string `mapstructure:"path"`
-
+ Path string `mapstructure:"path"`
AllowedOrigin string `mapstructure:"allowed_origin"`
+ Broker string `mapstructure:"broker"`
// wildcard origin
allowedWOrigins []wildcard
@@ -31,11 +32,16 @@ type Config struct {
}
// InitDefault initialize default values for the ws config
-func (c *Config) InitDefault() {
+func (c *Config) InitDefault() error {
if c.Path == "" {
c.Path = "/ws"
}
+ // broker is mandatory
+ if c.Broker == "" {
+ return errors.Str("broker key should be specified")
+ }
+
if c.Pool == nil {
c.Pool = &pool.Config{}
if c.Pool.NumWorkers == 0 {
@@ -64,7 +70,7 @@ func (c *Config) InitDefault() {
if origin == "*" {
// If "*" is present in the list, turn the whole list into a match all
c.allowedAll = true
- return
+ return nil
} else if i := strings.IndexByte(origin, '*'); i >= 0 {
// Split the origin in two: start and end string without the *
w := wildcard{origin[0:i], origin[i+1:]}
@@ -72,4 +78,6 @@ func (c *Config) InitDefault() {
} else {
c.allowedOrigins = append(c.allowedOrigins, origin)
}
+
+ return nil
}
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 07f22043..799312ad 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,8 +7,8 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
@@ -28,8 +28,8 @@ type Executor struct {
// associated connection ID
connID string
- // map with the pubsub drivers
- pubsub map[string]pubsub.Subscriber
+ // subscriber drivers
+ sub pubsub.Subscriber
actualTopics map[string]struct{}
req *http.Request
@@ -38,12 +38,12 @@ type Executor struct {
// NewExecutor creates protected connection and starts command loop
func NewExecutor(conn *connection.Connection, log logger.Logger,
- connID string, pubsubs map[string]pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
+ connID string, sub pubsub.Subscriber, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
log: log,
- pubsub: pubsubs,
+ sub: sub,
accessValidator: av,
actualTopics: make(map[string]struct{}, 10),
req: r,
@@ -126,11 +126,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
// subscribe to the topic
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Set(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Set(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
// handle leave
@@ -155,11 +153,9 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
return errors.E(op, err)
}
- if br, ok := e.pubsub[msg.Broker]; ok {
- err = e.Leave(br, msg.Topics)
- if err != nil {
- return errors.E(op, err)
- }
+ err = e.Leave(msg.Topics)
+ if err != nil {
+ return errors.E(op, err)
}
case commands.Headers:
@@ -170,13 +166,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
}
}
-func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Set(topics []string) error {
// associate connection with topics
- err := br.Subscribe(e.connID, topics...)
+ err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(e.connID, topics...)
+ _ = e.sub.Unsubscribe(e.connID, topics...)
return err
}
@@ -188,9 +184,9 @@ func (e *Executor) Set(br pubsub.Subscriber, topics []string) error {
return nil
}
-func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
+func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
- err := br.Unsubscribe(e.connID, topics...)
+ err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -207,10 +203,7 @@ func (e *Executor) Leave(br pubsub.Subscriber, topics []string) error {
func (e *Executor) CleanUp() {
// unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // here
- for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(e.connID, topic)
- }
+ _ = e.sub.Unsubscribe(e.connID, topic)
}
// clean up the actualTopics data
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
index e877fad3..ec6e1960 100644
--- a/plugins/websockets/origin_test.go
+++ b/plugins/websockets/origin_test.go
@@ -11,7 +11,8 @@ func TestConfig_Origin(t *testing.T) {
AllowedOrigin: "*",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("http://some.some.some.sssome", cfg))
assert.True(t, isOriginAllowed("http://", cfg))
@@ -29,7 +30,8 @@ func TestConfig_OriginWildCard(t *testing.T) {
AllowedOrigin: "https://*my.site.com",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
@@ -50,7 +52,8 @@ func TestConfig_OriginWildCard2(t *testing.T) {
AllowedOrigin: "https://my.*.com",
}
- cfg.InitDefault()
+ err := cfg.InitDefault()
+ assert.NoError(t, err)
assert.True(t, isOriginAllowed("https://my.site.com", cfg))
assert.False(t, isOriginAllowed("http://", cfg))
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index cf861c72..de7443fd 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -9,13 +9,12 @@ import (
"github.com/fasthttp/websocket"
"github.com/google/uuid"
json "github.com/json-iterator/go"
- endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -33,16 +32,14 @@ const (
type Plugin struct {
sync.RWMutex
- // Collection with all available pubsubs
- //pubsubs map[string]pubsub.PubSub
- //psProviders map[string]pubsub.PSProvider
+ // subscriber+reader interfaces
+ subReader pubsub.SubReader
+ // broadcaster
+ broadcaster broadcast.Broadcaster
- subReaders map[string]pubsub.SubReader
-
- cfg *Config
- cfgPlugin config.Configurer
- log logger.Logger
+ cfg *Config
+ log logger.Logger
// global connections map
connections sync.Map
@@ -53,8 +50,10 @@ type Plugin struct {
wsUpgrade *websocket.Upgrader
serveExit chan struct{}
+ // workers pool
phpPool phpPool.Pool
- server server.Server
+ // server which produces commands to the pool
+ server server.Server
// function used to validate access to the requested resource
accessValidator validator.AccessValidatorFn
@@ -71,14 +70,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
return errors.E(op, err)
}
- p.cfg.InitDefault()
- //p.pubsubs = make(map[string]pubsub.PubSub)
- //p.psProviders = make(map[string]pubsub.PSProvider)
-
- p.subReaders = make(map[string]pubsub.SubReader)
-
- p.log = log
- p.cfgPlugin = cfg
+ err = p.cfg.InitDefault()
+ if err != nil {
+ return errors.E(op, err)
+ }
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
@@ -90,19 +85,21 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.serveExit = make(chan struct{})
p.server = server
-
+ p.log = log
+ p.broadcaster = b
return nil
}
func (p *Plugin) Serve() chan error {
- errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
-
- //err := p.initPubSubs()
- //if err != nil {
- // errCh <- errors.E(op, err)
- // return errCh
- //}
+ errCh := make(chan error, 1)
+ // init broadcaster
+ var err error
+ p.subReader, err = p.broadcaster.GetDriver(p.cfg.Broker)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
go func() {
var err error
@@ -124,78 +121,28 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.subReaders {
- go func(ps pubsub.SubReader) {
- for {
- select {
- case <-p.serveExit:
+ go func(ps pubsub.Reader) {
+ for {
+ select {
+ case <-p.serveExit:
+ return
+ default:
+ data, err := ps.Next()
+ if err != nil {
+ errCh <- err
return
- default:
- data, err := ps.Next()
- if err != nil {
- errCh <- err
- return
- }
- p.workersPool.Queue(data)
}
+ p.workersPool.Queue(data)
}
- }(v)
- }
+ }
+ }(p.subReader)
return errCh
}
-//func (p *Plugin) initPubSubs() error {
-// for i := 0; i < len(p.cfg.PubSubs); i++ {
-// // don't need to have a section for the in-memory
-// if p.cfg.PubSubs[i] == "memory" {
-// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
-// r, err := provider.PSProvide("")
-// if err != nil {
-// return err
-// }
-//
-// // append default in-memory provider
-// p.pubsubs["memory"] = r
-// }
-// continue
-// }
-// // key - memory, redis
-// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
-// // try local key
-// switch {
-// // try local config first
-// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
-// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
-// if err != nil {
-// return err
-// }
-//
-// // append redis provider
-// p.pubsubs[p.cfg.PubSubs[i]] = r
-// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
-// r, err := provider.PSProvide(p.cfg.PubSubs[i])
-// if err != nil {
-// return err
-// }
-//
-// // append redis provider
-// p.pubsubs[p.cfg.PubSubs[i]] = r
-// default:
-// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
-// }
-// } else {
-// // no such driver
-// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
-// }
-// }
-//
-// return nil
-//}
-
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
@@ -210,23 +157,12 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.GetSubsReader,
- }
-}
-
func (p *Plugin) Available() {}
func (p *Plugin) Name() string {
return PluginName
}
-// GetSubsReader collects all plugins which implement SubReader interface
-func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
- p.subReaders[name.Name()] = pub
-}
-
func (p *Plugin) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != p.cfg.Path {
@@ -272,7 +208,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.subReader, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 22042d8d..cd9444da 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,15 +4,15 @@ import (
"sync"
json "github.com/json-iterator/go"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
- storage map[string]pubsub.SubReader
+ subscriber pubsub.Subscriber
connections *sync.Map
resPool sync.Pool
log logger.Logger
@@ -22,11 +22,11 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *websocketsv1.Message, 100),
- storage: pubsubs,
+ subscriber: subscriber,
log: log,
exit: make(chan struct{}),
}
@@ -90,19 +90,13 @@ func (wp *WorkersPool) do() { //nolint:gocognit
continue
}
- br, ok := wp.storage[msg.Broker]
- if !ok {
- wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
- continue
- }
-
// send a message to every topic
for i := 0; i < len(msg.GetTopics()); i++ {
// get free map
res := wp.get()
// get connections for the particular topic
- br.Connections(msg.GetTopics()[i], res)
+ wp.subscriber.Connections(msg.GetTopics()[i], res)
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
@@ -114,7 +108,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for topic := range res {
c, ok := wp.connections.Load(topic)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
+ wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i])
wp.put(res)
continue
}
@@ -135,7 +129,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit
err = c.(*connection.Connection).Write(d)
if err != nil {
for i := 0; i < len(msg.GetTopics()); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
+ wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i])
}
wp.put(res)
continue