summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
committerValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
commit68ff941c4226074206ceed9c30bd95317aa0e9fc (patch)
tree693306256281cccefb29f4eedb7f617a9022154e /plugins/broadcast
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go19
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/plugin.go119
-rw-r--r--plugins/broadcast/rpc.go75
4 files changed, 214 insertions, 0 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..18846f30
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,19 @@
+package broadcast
+
+/*
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ test:
+ driver: memory
+
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..b8d2947e
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-17T16:23:35.917Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="1VvfJYAxL9mW7TkXHKVj" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1rc5s4F/41nml3xhkkcf2YW7ud2U6z8buz7acdbGSbLUZewLnsr38lEBgkOcHhojibtHHgIMlwdK6PLkzQ5ebhc+Jv119JgKMJNIKHCbqaQAhNaNM/jPJYUBDyYEFZJWFQ0MCeMAv/xZxocOouDHDaKJgREmXhtklckDjGi6xB85OE3DeLLUnU/Natv8ISYbbwI5n6Zxhka/5ghmHsL/yKw9U6E69s/LI0J6RrPyD3NRK6nqDLhJCsONo8XOKIsa9kTFHv04Gr1Z0lOM7aVLg0bfvLxed46nxH01l8t/5OPk9ts2jmzo92/JH53WaPJQ8SsosDzFoxJujifh1meLb1F+zqPe12Sltnm4ieAXq4DKPokkQkyeuiwMfuckHpaZaQn7h2xV64eL6kV/gN4CTDDwcfDVQMo7KGyQZnySMtwisgl/OYi5lp8fP7Rp8VtHW9uzxO9LmcrKq295ykB5yZRzDWsodl7NJi/5SMzX9YDRJnNXrx0w/DgQGaHHccmeOmq+A4sq2hOG69bY7DJsenlUBrZPnA1kM3ywUhR4alm+OexF8cUIfFT0mSrcmKxH50vadeNHtgX+Y3Qrac73/jLHvk3tffZaTZK/ghzL6z6mcWP/vBG2PHVw/1k8fyJKaPW6vETn+U7bGTfbX8rKxXPB97qKd7jfKA7JIFfoJXZbzgJyucPVXOUItBgiM/C++aN6LqUV71hoT0FivxcaBxBkzoWg7/bMiSZQiep7hN3oYgJdVNvVxwSonUJTmwLjqgpejUBacmRwdERzQe7gIvlKHH3LVYhKBF2IqAq6uwnSeJ/1grsGVikx6WRdu1GtJnm0KsKJYH3cq74OnyyDA6lbc8S1CSgiO9qgyQnNvF7bfzq8vz2f/69XJtBbWTN7MtgYWKiA2YCmdmDRUimzotEqiZo71fO84gwWMt0nIJ1R0d2HPbsvu1SLCt++tokDrJAJR07Pb66suMkm7+uJj9cdGvoi2xfYD/jjc3elI0UwjUqwixrmhwTEXT6vr7ULRXrWfoFPQMSXr29frrt9sfb0nRlB5tVEVzTl3RnFetafYpaJqM+51//f1mgj6x/29I22xbt7a5Eqfpo0dsFIB+WV7dv/PDyJ9TJtMn3c3T3ZweBAmVgSSVuoCyJmvy2Y/CVUyPF5RFmLL0gjEwXPjROb+wCYOgUGKchv/mX1R0IM/HaLvWxcS6Ym1RvU0LFQZSV8UkxsMg4ZYp95IaCR+qlwDSaRRPDbIy2po4rTauvM16apwQP1j4KVO+kKnLklmvPs1cYGE3MFVmzoVzZNv9KFBl1o6N3s3BFMh+V6D2CtQa9LU6KtCLcDhHcKLl+OMh3Mtz3E7l7WfKA+AZT1UYBlizoF6JdtqLtA7RNLva9peJpoMakgDcI0VHqDCQ6GiNJpopltM2x3r91hB6OkTOFSTI5LHiIZETywNgHSujQo2BBg7kkYMJtCOWXWwbsmv/s2Ozfy42tH9CmlCc06vG9oF+5nJgFPRpxsSUXTNr11i6MuUZCrvGk5SqTXq04n/9DYusIvmsFqhxcrNKeZYXnie1JgtKSaiCv/wG+Bd9movlKW0r0tZSq/2xiF+L8DJ7nn95KYl7n3F2leeLH37ix/zGkjBefaRHs938FvtB44lrTycYqXTtb9lhkk9Sq9sWQhtfRvnEMBbwHgiLX4q7dIuFhVE9pEj5q5l69VjYGSwWfp//cIT1t9pa/2EmQEjW1xKhoIHnPAB5RtifeJ6SxU+cyfhPl9x0DoJgaaj0ERgO8noCdwBAIuLtnMlTkpTpKRpqRhJwJCZ/icPsA4PaKr/wUeL2m0DbpEl5NlSgBePCbTIqeosXmGkxNOatQJo32jUKJBTYY3ZNZbhPKXep+64W40M9uq+yH553X3AY9wVFLB0ZZwxg9yz+2cqZHZsUgSp6EiD8g0mOXMEVZHWAHAdqnb1zYpFYe1HuBZVURmJnwECm6ULHcm3YhHpMcRXEwHFZyY6aj/p2Hxf+6SdLqIyUpkkhiXM3NclX9yzD1ZuI2IDCC406nAD1jsc1vBBs6YVA0ws9O++uT9WFbVW3axLVrVehpFIlLhGEdyUscb/PfhjmUeEWtSKKWhUcFM/TbQGecEXNgZMAL/1dlHVpzo8ico+Dv0gSViDPHov5pQHLvPhLtvmCvGbTtJ379Pnmu9idMnYuUChNZggJK1lMT5E2qib/DobkwPdRzSOMUGskxxkofnC9s/1CFhM6DWlCY8cPMq5TIQ1ne8S2MlLcQjHUdipi23Uc9/TCC0OY1G8iObyAyhVqg+m1DAfNqDgwYcIPeEE1kjL6MhdJP6B/NjhN/RWmRtiImBafYCeIMR5SIQ3jxnjuu3Ftb1y9lsYV9bJ0q8UgZbXudCx76kk6+y1b54nYTbRb5SmYjF29/rlckmbSFFg/YI7k7Pe/C5gDw1MYy1ERcySP1r8j5oX6qMCKUSFzBHU6spODzJHZ1pXZw7gyETL3PC2QOSh34WmNmdMaI4Dm6H0u/BDC7A4Wlz0BmnsjT2Yo2aETNA+MBcZQFbZZtgWcoRIqr+3CvsESKqR394yGHzoB0By1XanXOaXq1quqPbokKLlIhfrDyzOcZrBDY4dmY44HWo9kBkTQ2rN0g9ZI6wLfU/Pfblsj0Mvk82NBa3ds/y1PzHptoPVIei2C1q4CAhgXtEYyADbDyV3P6xf1BE+uYpvAcYOnsuF3q9nCapqtV4kNlMKLaLQoFgObSVNG45pYtIwIddo7wQ+8QLnDpI9NgOBASmnrh6FNKDH6PwRDi/CP9nnbprxDzzsK3YjwtYHQ5ivaqO4EQGizbfJvDoTbiSA0bGLQwDVbebTOKDQ6EoPmru9geWkSV6P8MIi1qXnb2NOK3dpKvjXcgrsnEOupaCGHDuVkeG10xHrs+E5fkqUXmjo1hNpsC051TrO69aoMGh1CqIvs6LJQoX7RanBMY9rR6ZFUXvTG1WrjZ9HpwTK698Xx7Q1AiUE+76t72Y3nWHR6CkZ21iU/XjE8PZJii/D0VLED3LjwtCVjYv3D05qQsCnQPrhvv/lZOVrfjGE5LS2t1i0tLXndwu3NZRNvOxTbpFs/LmkVVHd2s5tHYbr+wFc4fKxFQvUKvSoxBoGFHVXPeraD/L72BULNfYEc1RCTM6oKa91a4SgVpic3OAnpczM/+mrCobb5kNV1hVk3LZXzoV2qCDPK7bZ2m+h8wZYc7aHv3/w5jm5IGubIA7qakywjm4mMjedbijV26tplURhTpSpfNWn0o06OIbw8R/H2QqTQJnswbZKBmxmOg0m1WosJCmHfTH+5mVN0wpsYj0ACSOuo9gNW2brBhiNs2VPdJOQuDFi3NHbiKzqJNIZP3+i+57bwgijYdqu6F3QTPd2/tbXIv/Zvv0XX/wc=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..3b771746
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,119 @@
+package broadcast
+
+import (
+ "sync"
+
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+const PluginName string = "broadcast"
+
+type Plugin struct {
+ sync.RWMutex
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.Publisher
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+
+ p.publishers = make(map[string]pubsub.Publisher)
+ p.log = log
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
+ p.publishers[name.Name()] = subscriber
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m []byte) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err = p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+
+ return nil
+ }
+
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.publishers[msg.GetBroker()]; ok {
+ err := br.Publish(m)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ }
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) pubsub.SubReader {
+ println(key)
+ return nil
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..fa853421
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,75 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ "google.golang.org/protobuf/proto"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: pkg/pubsub/message.fbs
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ err = r.plugin.Publish(bb)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: pkg/pubsub/message.fbs
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("publish_async")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.Messages)
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ bb, err := proto.Marshal(in.GetMessages()[i])
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+
+ r.plugin.PublishAsync(bb)
+ }
+
+ out.Ok = true
+ return nil
+}