diff options
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/config.go | 27 | ||||
-rw-r--r-- | plugins/broadcast/doc/broadcast_arch.drawio | 1 | ||||
-rw-r--r-- | plugins/broadcast/interface.go | 7 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 192 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go | 87 |
5 files changed, 0 insertions, 314 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go deleted file mode 100644 index 9531025b..00000000 --- a/plugins/broadcast/config.go +++ /dev/null @@ -1,27 +0,0 @@ -package broadcast - -/* - -# Global redis config (priority - 2) -default: - # redis configuration here - -websockets: # <----- one of possible subscribers - path: /ws - broker: default # <------ broadcast broker to use --------------- | - | match -broadcast: # <-------- broadcast entry point plugin | - default: # <----------------------------------------------------- | - driver: redis - # local redis config (priority - 1) - test: - driver: memory - - -priority local -> global -*/ - -// Config ... -type Config struct { - Data map[string]interface{} `mapstructure:"broadcast"` -} diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio deleted file mode 100644 index fd5ff1f9..00000000 --- a/plugins/broadcast/doc/broadcast_arch.drawio +++ /dev/null @@ -1 +0,0 @@ -<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile>
\ No newline at end of file diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go deleted file mode 100644 index eda3572f..00000000 --- a/plugins/broadcast/interface.go +++ /dev/null @@ -1,7 +0,0 @@ -package broadcast - -import "github.com/spiral/roadrunner/v2/common/pubsub" - -type Broadcaster interface { - GetDriver(key string) (pubsub.SubReader, error) -} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go deleted file mode 100644 index 40263eaa..00000000 --- a/plugins/broadcast/plugin.go +++ /dev/null @@ -1,192 +0,0 @@ -package broadcast - -import ( - "fmt" - "sync" - - endure "github.com/spiral/endure/pkg/container" - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/config" - "github.com/spiral/roadrunner/v2/plugins/logger" -) - -const ( - PluginName string = "broadcast" - // driver is the mandatory field which should present in every storage - driver string = "driver" - - // every driver should have config section for the local configuration - conf string = "config" -) - -type Plugin struct { - sync.RWMutex - - cfg *Config - cfgPlugin config.Configurer - log logger.Logger - // publishers implement Publisher interface - // and able to receive a payload - publishers map[string]pubsub.PubSub - constructors map[string]pubsub.Constructor -} - -func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { - const op = errors.Op("broadcast_plugin_init") - if !cfg.Has(PluginName) { - return errors.E(op, errors.Disabled) - } - p.cfg = &Config{} - // unmarshal config section - err := cfg.UnmarshalKey(PluginName, &p.cfg.Data) - if err != nil { - return errors.E(op, err) - } - - p.publishers = make(map[string]pubsub.PubSub) - p.constructors = make(map[string]pubsub.Constructor) - - p.log = log - p.cfgPlugin = cfg - return nil -} - -func (p *Plugin) Serve() chan error { - return make(chan error, 1) -} - -func (p *Plugin) Stop() error { - return nil -} - -func (p *Plugin) Collects() []interface{} { - return []interface{}{ - p.CollectPublishers, - } -} - -// CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) { - // key redis, value - interface - p.constructors[name.Name()] = constructor -} - -// Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(m *pubsub.Message) error { - p.Lock() - defer p.Unlock() - - const op = errors.Op("broadcast_plugin_publish") - - // check if any publisher registered - if len(p.publishers) > 0 { - for j := range p.publishers { - err := p.publishers[j].Publish(m) - if err != nil { - return errors.E(op, err) - } - } - return nil - } else { - p.log.Warn("no publishers registered") - } - - return nil -} - -func (p *Plugin) PublishAsync(m *pubsub.Message) { - // TODO(rustatian) channel here? - go func() { - p.Lock() - defer p.Unlock() - // check if any publisher registered - if len(p.publishers) > 0 { - for j := range p.publishers { - err := p.publishers[j].Publish(m) - if err != nil { - p.log.Error("publishAsync", "error", err) - // continue publishing to the other registered publishers - continue - } - } - } else { - p.log.Warn("no publishers registered") - } - }() -} - -func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { - const op = errors.Op("broadcast_plugin_get_driver") - - // choose a driver - if val, ok := p.cfg.Data[key]; ok { - // check type of the v - // should be a map[string]interface{} - switch t := val.(type) { - // correct type - case map[string]interface{}: - if _, ok := t[driver]; !ok { - panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val))) - } - default: - return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) - } - - // config key for the particular sub-driver broadcast.memcached.config - configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf) - - drName := val.(map[string]interface{})[driver] - - // driver name should be a string - if drStr, ok := drName.(string); ok { - if _, ok := p.constructors[drStr]; !ok { - return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr)) - } - - switch { - // try local config first - case p.cfgPlugin.Has(configKey): - // we found a local configuration - ps, err := p.constructors[drStr].PSConstruct(configKey) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[configKey] = ps - - return ps, nil - case p.cfgPlugin.Has(key): - // try global driver section after local - ps, err := p.constructors[drStr].PSConstruct(key) - if err != nil { - return nil, errors.E(op, err) - } - - // save the initialized publisher channel - // for the in-memory, register new publishers - p.publishers[configKey] = ps - - return ps, nil - default: - p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", key) - } - } - } - return nil, errors.E(op, errors.Str("could not find driver by provided key")) -} - -func (p *Plugin) RPC() interface{} { - return &rpc{ - plugin: p, - log: p.log, - } -} - -func (p *Plugin) Name() string { - return PluginName -} - -func (p *Plugin) Available() {} diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go deleted file mode 100644 index 475076a0..00000000 --- a/plugins/broadcast/rpc.go +++ /dev/null @@ -1,87 +0,0 @@ -package broadcast - -import ( - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/common/pubsub" - "github.com/spiral/roadrunner/v2/plugins/logger" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" -) - -// rpc collectors struct -type rpc struct { - plugin *Plugin - log logger.Logger -} - -// Publish ... msg is a proto decoded payload -// see: root/proto -func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("broadcast_publish") - - // just return in case of nil message - if in == nil { - out.Ok = false - return nil - } - - r.log.Debug("message published", "msg", in.String()) - msgLen := len(in.GetMessages()) - - for i := 0; i < msgLen; i++ { - for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { - if in.GetMessages()[i].GetTopics()[j] == "" { - r.log.Warn("message with empty topic, skipping") - // skip empty topics - continue - } - - tmp := &pubsub.Message{ - Topic: in.GetMessages()[i].GetTopics()[j], - Payload: in.GetMessages()[i].GetPayload(), - } - - err := r.plugin.Publish(tmp) - if err != nil { - out.Ok = false - return errors.E(op, err) - } - } - } - - out.Ok = true - return nil -} - -// PublishAsync ... -// see: root/proto -func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { - // just return in case of nil message - if in == nil { - out.Ok = false - return nil - } - - r.log.Debug("message published", "msg", in.GetMessages()) - - msgLen := len(in.GetMessages()) - - for i := 0; i < msgLen; i++ { - for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { - if in.GetMessages()[i].GetTopics()[j] == "" { - r.log.Warn("message with empty topic, skipping") - // skip empty topics - continue - } - - tmp := &pubsub.Message{ - Topic: in.GetMessages()[i].GetTopics()[j], - Payload: in.GetMessages()[i].GetPayload(), - } - - r.plugin.PublishAsync(tmp) - } - } - - out.Ok = true - return nil -} |