diff options
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/config.go | 6 | ||||
-rw-r--r-- | plugins/broadcast/doc/broadcast_arch.drawio | 2 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 135 |
3 files changed, 126 insertions, 17 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go index 18846f30..4f1e5213 100644 --- a/plugins/broadcast/config.go +++ b/plugins/broadcast/config.go @@ -1,6 +1,9 @@ package broadcast /* + +# Global redis config (priority - 2) + websockets: # <----- one of possible subscribers path: /ws broker: default # <------ broadcast broker to use --------------- | @@ -8,9 +11,12 @@ websockets: # <----- one of possible subscribers broadcast: # <-------- broadcast entry point plugin | default: # <----------------------------------------------------- | driver: redis + # local redis config (priority - 1) test: driver: memory + +priority local -> global */ // Config ... diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio index b8d2947e..b2ee091a 100644 --- a/plugins/broadcast/doc/broadcast_arch.drawio +++ b/plugins/broadcast/doc/broadcast_arch.drawio @@ -1 +1 @@ -<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile>
\ No newline at end of file +<mxfile host="Electron" modified="2021-06-17T18:52:07.846Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="eUG-4GjLSDkFvWQsKkB7" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5u4Gv41nml3xhkkcf0YO222M5vTnHg7bT+dwUa22WLLCziX/fVHAoFBkmMcrsmmlwSEJOC9v48ujNB083gdurv1DfFwMIKa9zhCVyMIoWZC+ouVPKUlCAIjLVmFvpeWgUPBzP8H80KNl+59D0elijEhQezvyoULst3iRVwqc8OQPJSrLUlQvuvOXWGpYLZwA7n0u+/F6+zFNO1w4Xfsr9axeGXjZrV5QbR2PfJQKEKfRmgaEhKnR5vHKQ4Y+TLCpO0+H7maP1mIt3GVBlPdNL9Mrrdj6wcaz7b36x/kemzqaTf3brDnr8yfNn7KaBCS/dbDrBdthCYPaz/Gs527YFcfKNtp2TreBPQM0MOlHwRTEpAwaYs8F9vLBS2P4pD8woUr5sLG8yW9Ir8Hf7V7HMb4sVDE3+sakw2OwydahV9FNqcxFzPd4OcPJZ6lZesiuxxe6HI5WeV9HyhJDzgxzyCsYbZL2KXB/ioJm/xhLcg2LpSnf5ohONBAmeKWJVNctxUUR6bRFsWNt01xWKb4OBfoHknesvXom+SCkCPN6JvijkRf7FGHxU9JGK/Jimzd4NOhdFLmwKHOH4TsON3/wnH8xL2vu49JmSv40Y9/sOYXBj/7yTtjx1ePxZOn7GRLX7fQiJ3+zPpjJ4dmyVnW7ijfIrIPF/gZ0mThgRuucPxcPc4wRrhnxSDEgRv79+VIQMVR3vSW+PSZc/GxoHYBdGgbFv9ZkiVDEzxP+ty8D0FK8od6ueBkEtmX5MCi6ICKolMUnIIcHREd0XjYC7xQhh5z22ARQhfClsVXDQvbZRi6T4UKOyY20XFZNG2jJH2mLsSKYn1Qr74Nnq+PNK1WfcMxBCVJKdKoymQO4+DcJndfL6+ml7M/m/VyNQS1ujczDYGEiogN6ApnZrQVIut9WiRQMEcHv3aeQYLnWqTlEqoZ7Zlz0zBrWSRY1f01bZBqyQCUdOzu09WXGS26/TaZfZs0q2hLbB6hv+XMtYYUTRcC9TxCLCoa7FLRenX9TSjakPQMvUo9Q5Ke3Xy6+Xr38y0pmtKjdapo1mtXNGtImma+Sk2Tcb/Lm//ejtBn9u8NaZtp9q1ttkRp+uoBGwWgN0uau/euH7hzSmT68vt5tJ/TAy+kMhBGEgsoHeIynd3AX23p8YJSDVOSThi1/IUbXPILG9/zUiXGkf9PcqOUgTwfo/0ak5FxxfqiehulKgwkVm3JFreDhBu6zCU1Et4WlwDq0ygOHLLSqpq4Ydm47LmLqXFIXG/hRkz5fKYuS2a9mjRznoFtT1eZORvOkfmsrzkjTTZfGL3rrSmQ+a5Ap/TitAIZTSvQi3A4S3Ci2fjjMdzLsexa9c0T9QFwtOcatAOsGbBfibaqi3QHoqk3bttfJpoWKkkCsM8UHaFBS6LTazRRTrGsqjnW4KwhdAYhcrYgQTqPFY+JnFgfAONcGRVatDRwII8cjKAZsOxiV5Jd8+89m/0z2VCG+TShuKRXtd0j/ZnIgZaWj2MmpuyaXrjG0pUxz1DYNZ6k5H3SoxX/7W5YZBXIZ4VAjReXm2RnSeV5WOgyLckK8uAveQB+o89zsT4t24lla6nX5kjErwV4GZ+mX1JLot41jq+SfPHDL/yUPFjob1cf6dFsP7/Drld648LbCUYqWrs7dhgmk9SKtoXQzpdBMjGMBbxHwuIGcZczYmFhVA8pUv58pl4xFrZai4Xf5z+cDHFPW/+OJkBI1tcQoaCW5zwAeUbYdzyPyOIXjmX8p05uOgeet9RU+gg0CzkNgTsAIBHxti7kKUnK9BS1NSMJWBKRv2z9+AOD2nK/8FGi9ptA26RJeSZUoAXdwm0yKnqHF5hpMdTmlUCaN8oaBRIKzC5Zk1vy15S7FH1XhfGhl7uvjOyn3RfsyH1BEUtH2gUD2B2D/6zkzM5NikAePQkQ/tEkR25gC7LaQo4De529M+xIrLoot4NKKiOxC6AhXbehZdgmLEM9urgKouW4LKNPwUd9fdim/ukXS6i0iKZJPtkmbmqUrO5Z+qs3EbEBhRfqdDgB9jseV/JCsKIXAmUvdHLeXQ3VhVVVt/Ekqh5XoaRSGS7h+fcZLPFwyH4Y5pHjFoUqilY5HLSdR7sUPOGKmgAnHl66+yCu050bBOQBe/8joZ+DPAcs5rcSLPPim+ySBXnlrmk/D9Hp7uvYnSx2TlGonswQElay6I4ibVRN/m0NyYHvo5onw4LTRsjqKn6wnYvDQhY9u2+GC3YdP8i4To40XBwQ29xIcQvFUNuxiG0XcdzXF15owqR+HcnhBVSuUGtNr2U4aEbFgQkTfsQLqpGU0NNEJF2P/trgKHJXmBphLWBa/AqZIMZ4SIU0dBvj2e/G9ShtnIrGFbWzdKvCIGW+7rQre+pIOvs1XieJ2G2wXyUpmIxdDX8ul6SZNAXuHzBHcvb77wXMgeYojGWniDmSR+vfEfNUfVRgRaeQOYJ9OrKhQ+aZhzrtysyOXJkImTtOL5A5yHbhqYyZ0xYdgObofS58A8JsdxeXPQOaOx1PZsjo0ydo7mkLjKEqbDNMA1htJVRO1YV9rSVUqN/dM0p+aHigOaq6Uq/5lKoeV1V7dElQcpoKNYeXxziKYY3Ojs3G7A607sgMiKC1Y/QNWqNeF/gO3H/bVY1AO5PPzwWt7a79tzwxa2igdUd6LYLWtgIC6Ba0RjIANsPhfcPrF/sJnmzFNoHdBk9Zx+9W8/jir9OrxLpK4UU0WhSLls2kLqNxZSxaRoRq7Z3geo6n3GHSxTpAsCWlNPuHoXUoEfpfBEOL8E/v87Z1eYeedxS6FOH3BkLrA9qobnggtF41+de7wu1EEBqWMWhg65U8Wm0UGp2JQXPXd7S+NImrVL8dxFrvedvYQcduVSU/y3d6RqzHooVsO5ST4bXOEeuu47v+kqx+oamBI9R6VXCq+TSrHldl0OgYQp1mR9NUhZpFq8E5nfWOTnek8qI3zlcbn0SnW8vo3hfHn3TBp311O7vxnItOj0HHzjoj0IDh6Y4UW4Snx4od4LqFpw0ZE2senu4JCRuD3gf3zTc/K6fLL2MYVkVLO6wtLQ153cLd7bSMtx2LbaKdu83Kcqju4nY/D/xo/YGvcPhYiISKDRpVYgw8A1sqzjqmhdym9gVC5X2BLNUQk9WpCve6tcJZKkxPbnHo0/dmfrSvcKhqPmQ0vsKsnpbK+dA+UoQZ2XZb+01wuWBLjg7Q9x/uHAe3JPIT5AFdzUkck81IxsaTLcVKO3Xt48DfUqXKPjX5LA+qq5OlCR/PUXy9ECm0yWxNm2TgZoa33ihfrcUkh7A70//czCmY8CbGI5AA0lqq/YBVtq614QhT9lS3Ibn3PcaW0k58KZNIafj0je57bgofiIJVt6prgk0T6/Y/3/+cgL+nd5vV1PxBfp99U3096jCKV8flHyGNgoBHqWUIm1wD3aw4HN2EB1eSC0rkSjPOxLWsfCo3LoeKo8Uab9zBinEtxojD0nqXK3CUfBnQ2Gc73w5SxjcnoyolrRSfBjpuGjoIoZ57yIF9cauW1ogDvtScVcQUmvg2iZLK8g4F/NskUXo3Su0Z99rDDZxqMUUHIlMUYa0SIG/NlslhbZEJ2oIl5douj6W8zAHJmf/cjTCLhxOHROOrQztWmo5j7nOPlWwdXJnHfFRj+ByW1E61uZUyc3kBUkpPQ8I26TmA3tT4rG+Ih1mN/wM=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 3b771746..c43b2e4c 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -1,25 +1,37 @@ package broadcast import ( + "fmt" "sync" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" "google.golang.org/protobuf/proto" ) -const PluginName string = "broadcast" +const ( + PluginName string = "broadcast" + // driver is the mandatory field which should present in every storage + driver string = "driver" + + redis string = "redis" + memory string = "memory" +) type Plugin struct { sync.RWMutex - log logger.Logger + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // publishers implement Publisher interface // and able to receive a payload - publishers map[string]pubsub.Publisher + publishers map[string]pubsub.PubSub + providers map[string]pubsub.PSProvider } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if !cfg.Has(PluginName) { return errors.E(op, errors.Disabled) } + p.cfg = &Config{} + // unmarshal config section + err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) + if err != nil { + return errors.E(op, err) + } + + p.publishers = make(map[string]pubsub.PubSub) + p.providers = make(map[string]pubsub.PSProvider) - p.publishers = make(map[string]pubsub.Publisher) p.log = log + p.cfgPlugin = cfg + return nil +} + +func (p *Plugin) Serve() chan error { + const op = errors.Op("broadcast_plugin_serve") + errCh := make(chan error, 1) + + // iterate over config + for k, v := range p.cfg.Data { + if v == nil { + continue + } + + switch t := v.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + default: + p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") + continue + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, k) + + switch v.(map[string]interface{})[driver] { + case memory: + if _, ok := p.providers[memory]; !ok { + p.log.Warn("no memory drivers registered", "registered", p.publishers) + continue + } + ps, err := p.providers[memory].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case redis: + if _, ok := p.providers[redis]; !ok { + p.log.Warn("no redis drivers registered", "registered", p.publishers) + continue + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + case p.cfgPlugin.Has(redis): + ps, err := p.providers[redis].PSProvide(configKey) + if err != nil { + errCh <- errors.E(op, err) + return errCh + } + + // save the pubsub + p.publishers[k] = ps + continue + } + } + } + + return errCh +} + +func (p *Plugin) Stop() error { return nil } @@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) { - p.publishers[name.Name()] = subscriber +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) { + // key redis, value - interface + p.providers[name.Name()] = subscriber } // Publish is an entry point to the websocket PUBSUB @@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) { // Get payload for i := 0; i < len(msg.GetTopics()); i++ { - if br, ok := p.publishers[msg.GetBroker()]; ok { - err := br.Publish(m) - if err != nil { - p.log.Error("publish async error", "error", err) + if len(p.publishers) > 0 { + for j := range p.publishers { + p.publishers[j].PublishAsync(m) } - } else { - p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker()) + return } + p.log.Warn("no publishers registered") } }() } -func (p *Plugin) GetDriver(key string) pubsub.SubReader { - println(key) - return nil +func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { + const op = errors.Op("broadcast_plugin_get_driver") + // key - driver, default for example + // we should find `default` in the collected pubsubs providers + if pub, ok := p.publishers[key]; ok { + return pub, nil + } + return nil, errors.E(op, errors.Str("could not find driver by provided key")) } func (p *Plugin) RPC() interface{} { |