summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go25
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go208
-rw-r--r--plugins/broadcast/rpc.go87
5 files changed, 328 insertions, 0 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
new file mode 100644
index 00000000..4f1e5213
--- /dev/null
+++ b/plugins/broadcast/config.go
@@ -0,0 +1,25 @@
+package broadcast
+
+/*
+
+# Global redis config (priority - 2)
+
+websockets: # <----- one of possible subscribers
+ path: /ws
+ broker: default # <------ broadcast broker to use --------------- |
+ | match
+broadcast: # <-------- broadcast entry point plugin |
+ default: # <----------------------------------------------------- |
+ driver: redis
+ # local redis config (priority - 1)
+ test:
+ driver: memory
+
+
+priority local -> global
+*/
+
+// Config ...
+type Config struct {
+ Data map[string]interface{} `mapstructure:"broadcast"`
+}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
new file mode 100644
index 00000000..fd5ff1f9
--- /dev/null
+++ b/plugins/broadcast/doc/broadcast_arch.drawio
@@ -0,0 +1 @@
+<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..46709d71
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) (pubsub.SubReader, error)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
new file mode 100644
index 00000000..6ddef806
--- /dev/null
+++ b/plugins/broadcast/plugin.go
@@ -0,0 +1,208 @@
+package broadcast
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/google/uuid"
+ endure "github.com/spiral/endure/pkg/container"
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/config"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+)
+
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
+
+type Plugin struct {
+ sync.RWMutex
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
+ // publishers implement Publisher interface
+ // and able to receive a payload
+ publishers map[string]pubsub.PubSub
+ constructors map[string]pubsub.Constructor
+}
+
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
+ const op = errors.Op("broadcast_plugin_init")
+ if !cfg.Has(PluginName) {
+ return errors.E(op, errors.Disabled)
+ }
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.constructors = make(map[string]pubsub.Constructor)
+
+ p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ return make(chan error)
+}
+
+func (p *Plugin) Stop() error {
+ return nil
+}
+
+func (p *Plugin) Collects() []interface{} {
+ return []interface{}{
+ p.CollectPublishers,
+ }
+}
+
+// CollectPublishers collect all plugins who implement pubsub.Publisher interface
+func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) {
+ // key redis, value - interface
+ p.constructors[name.Name()] = constructor
+}
+
+// Publish is an entry point to the websocket PUBSUB
+func (p *Plugin) Publish(m *pubsub.Message) error {
+ p.Lock()
+ defer p.Unlock()
+
+ const op = errors.Op("broadcast_plugin_publish")
+
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
+ }
+ }
+ return nil
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+
+ return nil
+}
+
+func (p *Plugin) PublishAsync(m *pubsub.Message) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ p.log.Error("publishAsync", "error", err)
+ // continue publish to other registered publishers
+ continue
+ }
+ }
+ } else {
+ p.log.Warn("no publishers registered")
+ }
+ }()
+}
+
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { //nolint:gocognit
+ const op = errors.Op("broadcast_plugin_get_driver")
+
+ // choose a driver
+ if val, ok := p.cfg.Data[key]; ok {
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := val.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val)))
+ }
+ default:
+ return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, key)
+
+ switch val.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.constructors[memory]; !ok {
+ return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
+ }
+ ps, err := p.constructors[memory].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
+ p.publishers[uuid.NewString()] = ps
+
+ return ps, nil
+ case redis:
+ if _, ok := p.constructors[redis]; !ok {
+ return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+
+ // then try global if local does not exist
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // if section already exists, return new connection
+ if _, ok := p.publishers[configKey]; ok {
+ return ps, nil
+ }
+
+ // if not - initialize a connection
+ p.publishers[configKey] = ps
+ return ps, nil
+ }
+ }
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
+}
+
+func (p *Plugin) RPC() interface{} {
+ return &rpc{
+ plugin: p,
+ log: p.log,
+ }
+}
+
+func (p *Plugin) Name() string {
+ return PluginName
+}
+
+func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
new file mode 100644
index 00000000..2ee211f8
--- /dev/null
+++ b/plugins/broadcast/rpc.go
@@ -0,0 +1,87 @@
+package broadcast
+
+import (
+ "github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
+)
+
+// rpc collectors struct
+type rpc struct {
+ plugin *Plugin
+ log logger.Logger
+}
+
+// Publish ... msg is a proto decoded payload
+// see: root/proto
+func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ const op = errors.Op("broadcast_publish")
+
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.String())
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ err := r.plugin.Publish(tmp)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
+ }
+ }
+
+ out.Ok = true
+ return nil
+}
+
+// PublishAsync ...
+// see: root/proto
+func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
+ // just return in case of nil message
+ if in == nil {
+ out.Ok = false
+ return nil
+ }
+
+ r.log.Debug("message published", "msg", in.GetMessages())
+
+ msgLen := len(in.GetMessages())
+
+ for i := 0; i < msgLen; i++ {
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
+
+ r.plugin.PublishAsync(tmp)
+ }
+ }
+
+ out.Ok = true
+ return nil
+}