summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/config.go27
-rw-r--r--plugins/broadcast/doc/broadcast_arch.drawio1
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go192
-rw-r--r--plugins/broadcast/rpc.go87
5 files changed, 0 insertions, 314 deletions
diff --git a/plugins/broadcast/config.go b/plugins/broadcast/config.go
deleted file mode 100644
index 9531025b..00000000
--- a/plugins/broadcast/config.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package broadcast
-
-/*
-
-# Global redis config (priority - 2)
-default:
- # redis configuration here
-
-websockets: # <----- one of possible subscribers
- path: /ws
- broker: default # <------ broadcast broker to use --------------- |
- | match
-broadcast: # <-------- broadcast entry point plugin |
- default: # <----------------------------------------------------- |
- driver: redis
- # local redis config (priority - 1)
- test:
- driver: memory
-
-
-priority local -> global
-*/
-
-// Config ...
-type Config struct {
- Data map[string]interface{} `mapstructure:"broadcast"`
-}
diff --git a/plugins/broadcast/doc/broadcast_arch.drawio b/plugins/broadcast/doc/broadcast_arch.drawio
deleted file mode 100644
index fd5ff1f9..00000000
--- a/plugins/broadcast/doc/broadcast_arch.drawio
+++ /dev/null
@@ -1 +0,0 @@
-<mxfile host="Electron" modified="2021-06-18T09:34:25.915Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/14.6.13 Chrome/89.0.4389.128 Electron/12.0.9 Safari/537.36" etag="THNfOcV33EQGG0gzo1UK" version="14.6.13" type="device"><diagram id="xG4Au6HO45p6fae_AhkE" name="Page-1">7V1bc6M4Fv41rk1vVVIg7o+Jk8l01fR2Np7e7n7a4iLbbDB4AMdJ//qVQGCQZBsHEMSTviRGIAznfj4dSRNlunq5j+318kvkwWACJO9lotxOALBkE/3EDa95AzAVJW9ZxL6Xt8m7hpn/C5JGibRufA8mtQvTKApSf11vdKMwhG5aa7PjONrWL5tHQf1b1/YCMg0z1w7Y1u++ly7Ja0iStDvxO/QXy5Q+s7KLq0lDsrS9aFtpUu4myjSOojT/tHqZwgBTryBM3u+3PWfLJ4thmDbpMFV1/fPNfXhp/FAuZ+Hz8kd0f6mr+W2e7WBDXpk8bfpa0CCONqEH8V2kiXKzXfopnK1tF5/dIq6jtmW6CtCRjD7O/SCYRkEUZ30Vz4bm3EXtSRpHT7ByRndN6MzRGfY9yKs9wziFL5Um8l73MFrBNH5Fl5CzikloTMRM1cjxtsazvG1ZZZdFGm0iJ4vy3jtKog+EmCcQVtP7Jexcw3+5hM3+4B5RmFba8z/dEFyW5DrFDYOluGpyKK7oWl8U186b4qBO8ctSoAckec/WY2iSU0KuSNrQFLcY+kIPOSxyGMXpMlpEoR3c7Vpv6hzYXfNHFK0J3f8H0/SVeF97k0Z1rsAXP/2Bu19p5OgnuRn+fPtSPXgtDkL0upVO+PBncT98sOuWHRX99vItiTaxCw+QpggP7HgB00PXEYZhwh0UgxgGduo/1yMBHkdJ14fIR89cio8BpCtZBaZmkJ81WdIkyvPkz03uQUlJ+VBvF5xCIoeSHFAVHbmh6FQFpyJHe0SHNh6mC11u6OGYGo4QRAhbEV91LGzXcWy/Vi5YY7FJ9suibmo16dNVKlakr5fbXW/Kh69XJKnV9ZqlUUqSU6RTlSkcxs653Tx+vb6dXs/+7NbLtRDU5t5M1ygSciI2WeU4M62vEFkd0iLJFXO082unGSRwqkWazwGf0Z7u6JreyiKBpu6va4PUSgYAo2OPd7efZ6jp4dvN7NtNt4o2h/oe+huWI3WkaCoVqJcRYlXRgEhFG9T1d6FoY9Iz5V3qmcLo2Ze7L18ff56TonE9mlBFM967ohlj0jT9XWoai/tdf/n3w0T5Df87I23T9aG1zWQojV49wKMA6Muy7vaz7Qe2g4iMXn7jJBsHffBiJANxwrAA0SGt09kO/EWIPruIahCR9AZTy3ft4JqcWPmelysxTPxf2RflDCT5GLqvdjPRbvG9kN4muQrLDKvCKIT9IOGaynKJj4T3xSVZGdIojhyykpqauHHZuOK5q6lxHNmeaydY+XysLnNsvbo0c54GTU/lmTkTOIp+0NeckCbrb4ze1d4USP9QoGN6cVyBtK4V6E04nEE50WL8cR/uZRlmq+v1I9fLsiUd6tAPsKaBYSXaaC7SAkRT7dy2v000DaUmCbJ5ouhQHXoSnUGjiXqKZTTNsUZnDYE1CpEzKQlSSay4T+To62VZO1VGqR49DRywIwcToAc4u1jXZFf/a4Orf25WiGE+Siiu0Vlp/YJ+ZnIg5e2XKRZTfE6tnMPpyiXJUPA5kqSU90SfFuS3vcKRVcAeVQI10lzvUhxlFztx5ZZ5S9FQBn/ZA5Av+s2hr0dta7ptydy1OxKRcwGcp8fpl13FUO8eprdZvnjxBF+zB4v9cPEJfZptnEdoe7U3rrwdZaSSpb3GH+OsSK1qWyJ083mQFYbhgHdPWNwh7nJCLEyN6imclL+s1KvGwkZvsfBH/cPREPe49RdUAMFYX42GgnqueZDZirDv0Eki9wmmLP7TJjd1ZM+bSzx9lCVDsToCd2RZoRFv44otSeKmp0pfFUmywRD5c+inFxhqK/3CJ4baZ4G2MUV5OuCgBWLhNhYVfYQuxFoMJKcRSHOmrOEgobIukjWlJX9PuUvVdzUYH3q7+yrIftx9AUHuC9BYuiJdYYDd0sjPRs7s1KRILqMnCsLfm+SwHUxKVnvIccCg1TvjjsSai3I/qCQ3EruSJUVVTWBopg7qUI9Kz4LoOS4r6FPxUV+3Ye6fnnBCJSUoTfKjMHNTk2x2z9xfnEXEJnO8kNDhBDDseFzNC4GGXkiue6GjdXctVBc0Vd3Ok6h2XAWMShW4hOc/F7DEdpf9YMyjxC0ql3B6lXBQ6CTrHDwhipoBJx6c25sgbXM7OwiiLfT+G8V+CfLssJh/1mCZN3/JOpuQV781us82OX77NnaniJ1zFGogM6RQM1lUi5M28op/e0NywMeo5tGw4LgRMkTFD6Z1tZvIohbfW+CCouMHFtcpkYarHWJbGilioTBqe0lj21Uc9/2FFxJV1K8qbHgBuDPUetNrFg6aIXHAwgRfoIs0EhF6momk7aFfK5gk9gIiIywFWIvfIRPoGE/hIQ1iYzzzw7jupY3V0Lgq/UzdajBIWc47FWVPLUZnv6bLLBF7CDaLLAVjsavx13IxmolS4OEBc4XNfv++gLksWRxjKRQxV9jR+g/EPFcfHlghFDJXwJCObOyQeeGhjrsyXZAroyFzyxoEMpeLVXgaY+aohwDQXPmohe9AmE1xcdkB0NwSXMxQ0GdI0NyTXAgBL2zTdE02+kqorKYT+3pLqJRhV8+o+aHxgeZK05l63adU7bjKW6OLgZLzVKg7vDyFSQpa3GxfNaY40FqQGaBBa0sbGrRWBp3gO3L/bTY1Av0Un58KWpui/TdbmDU20FqQXtOgtcmBAMSC1goLgM1g/Nzx/MVhgieTs0yg2OCpuPGH1dw/+ev4LDFRKTyNRtNi0bOZVFk0ro5Fs4hQq7UTbM/yuCtM2lCVFdCTUurDw9AqYAj9N4Khafhn8LptlV2h5wOFrkX4g4HQ6ogWqhsfCK02Tf5VUbgdDUKDOgYtm2ojj9YahVZOxKCJ69t7PVPEVbu+H8RaHXjZ2FHHbk0lv8h3BkasL2kL2Xcox8JrwhFr0fHdcEnWsNDUyBFqtSk41X2a1Y6rLGi0D6HOs6NprkLdotXyKTcbHJ0WpPK0Ny5nGx9Fp3vL6D4mxx91wcd9dT+r8ZyKTl/Kgp11QaARw9OCFJuGpy85K8CJhac1FhPrHp4eCAm7lAcf3NfPvipH5M4YmtHQ0o5rSUuNnbfw+DCt4237YptkbYdFWwnVXT1snMBPlhdkhsOnSiRU7dCpEkPZ06DB46ylG4rd1bpASn1dIIM3xGQIVeFBl1Y4SYXRwQOMffTe2I8OFQ41zYe0zmeYtdNSNh/aJJwwo1hua7MKrl085WgHff9hOzB4iBI/Qx6UWydK02g1YbHxbEmx2kpdmzTwQ6RUxVaTB3nQXJ0Mido8h7N7ocLRJr03bWKBmxkMvUk5WwtLToS/Gf0nZo7DhLMYj1AokNbgrQfMs3W9DUforKd6iKNn38Nsqa3ElzMpqg2fnum65zq1QRRoulRdF2y6MR7+9f3PG/mv6eNqMdV/RL/Pvg27FwS131rjqBIICSspZ3LUhXHpyymc4F5ntvRX/PRdo5YJlzXaGneXr/Nff9ApnvVByaZ4b0v5aryXsFD5kkUFRNxvx/1HY2aabuvYKnkdqRSALszMqUPOjBUy9GNjzvLhHq1HkQ9Rkb9dRJcYVlMPxAQWHOnYG2to9KxijbeaSl8QFt8MMATO8dpMAhc+IodNBloTdwlX9miDwFaMAQxjOJlUb7VDXMaMqHRoXFvvdWKdOXvx7bc/Q7loTmEftcUljSt+Xq0DuIKZ6cf59Szbt2oC8NDKw2wahYi4mwzW6NR4Nt1GrGMd5S0A0Nc+tFwGsasJkX3EkvzbjhH9LGwnjXPw+SLUdrIoVIUP6ISLQXRpXWIfXuHyWI1y7ARi/CpzgekS7vrh1rzuaFP6yGyp/8ZMJlUI42cxoDASWee4Ry7S2MXIJpfFA1csGG/KXoxa+rJziSN0kByYf3wOskhJPoKkYWWgn0IXnV7dSpOkq7w8VdJNC7+uSNyMHUCa3T3+544ThIX+biPRfPNQHAqgmMpd4vYYcirnkjV0/bmfeRV/52v6KGwVEq0xqa7atFqji8Epfqo78GzCt/gMqtD11F2WBUNejTGvLuzFyXuGGSplTo7sAWape0CBxh10st5ovxAZZxUyKk2ULrBN8lHc+SszMHY2JJvg1TWh94m1YLOyuv4Cvtg4p/xErdjcxh4dWSKiqci2Mk/0yLms6iwSB4rlYrteIoLPx/e4rrHRBpBvXHPUjXUCY7ZOLL5ebJu+19rQ4dHJHQz5cIdyT6I9HXqyZywk3dPeW43lr5WpKXeO3PkRkXN++DRmK0Hu8wnDq9qO93MMWpFQdIXfqZh3ddAbnAWspZu0M+dVc/Pgxt5grWIPwyreuITuU4Uv58kKpmTCZJ21JZITgI25fuKVyHHZ72f8EHamTOnST6rMmWbPh/M/KTvxinzWZAc/niXvaKejS5xAS6gaAd6CCekmxvEuqUvMx03KAm4cLa+L9awvYuhl3CNpes5iEiWfJQ8NqmpbNjgrKyhCa+SKTcRqQy+VgRYphDCH6HN9w5rnQNfeJFj5thnoH4X/wMHOUxhtJyyUb6el5yOADsqXFvCKzZZKedn62YM7uI/9bPtBxjxSTOnkcE42PJQ9mPNafsFd6G0yqxCFQRY0zvMqzOoXZxxLEuhdNVnY7xzFkAlfAWtJVN440xsGIdBhHOGdfXaRLYoml18iD+Ir/g8=</diagram></mxfile> \ No newline at end of file
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
deleted file mode 100644
index eda3572f..00000000
--- a/plugins/broadcast/interface.go
+++ /dev/null
@@ -1,7 +0,0 @@
-package broadcast
-
-import "github.com/spiral/roadrunner/v2/common/pubsub"
-
-type Broadcaster interface {
- GetDriver(key string) (pubsub.SubReader, error)
-}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
deleted file mode 100644
index 40263eaa..00000000
--- a/plugins/broadcast/plugin.go
+++ /dev/null
@@ -1,192 +0,0 @@
-package broadcast
-
-import (
- "fmt"
- "sync"
-
- endure "github.com/spiral/endure/pkg/container"
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/config"
- "github.com/spiral/roadrunner/v2/plugins/logger"
-)
-
-const (
- PluginName string = "broadcast"
- // driver is the mandatory field which should present in every storage
- driver string = "driver"
-
- // every driver should have config section for the local configuration
- conf string = "config"
-)
-
-type Plugin struct {
- sync.RWMutex
-
- cfg *Config
- cfgPlugin config.Configurer
- log logger.Logger
- // publishers implement Publisher interface
- // and able to receive a payload
- publishers map[string]pubsub.PubSub
- constructors map[string]pubsub.Constructor
-}
-
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
- const op = errors.Op("broadcast_plugin_init")
- if !cfg.Has(PluginName) {
- return errors.E(op, errors.Disabled)
- }
- p.cfg = &Config{}
- // unmarshal config section
- err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
- if err != nil {
- return errors.E(op, err)
- }
-
- p.publishers = make(map[string]pubsub.PubSub)
- p.constructors = make(map[string]pubsub.Constructor)
-
- p.log = log
- p.cfgPlugin = cfg
- return nil
-}
-
-func (p *Plugin) Serve() chan error {
- return make(chan error, 1)
-}
-
-func (p *Plugin) Stop() error {
- return nil
-}
-
-func (p *Plugin) Collects() []interface{} {
- return []interface{}{
- p.CollectPublishers,
- }
-}
-
-// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) {
- // key redis, value - interface
- p.constructors[name.Name()] = constructor
-}
-
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m *pubsub.Message) error {
- p.Lock()
- defer p.Unlock()
-
- const op = errors.Op("broadcast_plugin_publish")
-
- // check if any publisher registered
- if len(p.publishers) > 0 {
- for j := range p.publishers {
- err := p.publishers[j].Publish(m)
- if err != nil {
- return errors.E(op, err)
- }
- }
- return nil
- } else {
- p.log.Warn("no publishers registered")
- }
-
- return nil
-}
-
-func (p *Plugin) PublishAsync(m *pubsub.Message) {
- // TODO(rustatian) channel here?
- go func() {
- p.Lock()
- defer p.Unlock()
- // check if any publisher registered
- if len(p.publishers) > 0 {
- for j := range p.publishers {
- err := p.publishers[j].Publish(m)
- if err != nil {
- p.log.Error("publishAsync", "error", err)
- // continue publishing to the other registered publishers
- continue
- }
- }
- } else {
- p.log.Warn("no publishers registered")
- }
- }()
-}
-
-func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
- const op = errors.Op("broadcast_plugin_get_driver")
-
- // choose a driver
- if val, ok := p.cfg.Data[key]; ok {
- // check type of the v
- // should be a map[string]interface{}
- switch t := val.(type) {
- // correct type
- case map[string]interface{}:
- if _, ok := t[driver]; !ok {
- panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val)))
- }
- default:
- return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
- }
-
- // config key for the particular sub-driver broadcast.memcached.config
- configKey := fmt.Sprintf("%s.%s.%s", PluginName, key, conf)
-
- drName := val.(map[string]interface{})[driver]
-
- // driver name should be a string
- if drStr, ok := drName.(string); ok {
- if _, ok := p.constructors[drStr]; !ok {
- return nil, errors.E(op, errors.Errorf("no drivers with the requested name registered, registered: %s, requested: %s", p.publishers, drStr))
- }
-
- switch {
- // try local config first
- case p.cfgPlugin.Has(configKey):
- // we found a local configuration
- ps, err := p.constructors[drStr].PSConstruct(configKey)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[configKey] = ps
-
- return ps, nil
- case p.cfgPlugin.Has(key):
- // try global driver section after local
- ps, err := p.constructors[drStr].PSConstruct(key)
- if err != nil {
- return nil, errors.E(op, err)
- }
-
- // save the initialized publisher channel
- // for the in-memory, register new publishers
- p.publishers[configKey] = ps
-
- return ps, nil
- default:
- p.log.Error("can't find local or global configuration, this section will be skipped", "local: ", configKey, "global: ", key)
- }
- }
- }
- return nil, errors.E(op, errors.Str("could not find driver by provided key"))
-}
-
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
-func (p *Plugin) Name() string {
- return PluginName
-}
-
-func (p *Plugin) Available() {}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
deleted file mode 100644
index 475076a0..00000000
--- a/plugins/broadcast/rpc.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package broadcast
-
-import (
- "github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/common/pubsub"
- "github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
-)
-
-// rpc collectors struct
-type rpc struct {
- plugin *Plugin
- log logger.Logger
-}
-
-// Publish ... msg is a proto decoded payload
-// see: root/proto
-func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("broadcast_publish")
-
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.String())
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
- if in.GetMessages()[i].GetTopics()[j] == "" {
- r.log.Warn("message with empty topic, skipping")
- // skip empty topics
- continue
- }
-
- tmp := &pubsub.Message{
- Topic: in.GetMessages()[i].GetTopics()[j],
- Payload: in.GetMessages()[i].GetPayload(),
- }
-
- err := r.plugin.Publish(tmp)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
- }
- }
-
- out.Ok = true
- return nil
-}
-
-// PublishAsync ...
-// see: root/proto
-func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- // just return in case of nil message
- if in == nil {
- out.Ok = false
- return nil
- }
-
- r.log.Debug("message published", "msg", in.GetMessages())
-
- msgLen := len(in.GetMessages())
-
- for i := 0; i < msgLen; i++ {
- for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
- if in.GetMessages()[i].GetTopics()[j] == "" {
- r.log.Warn("message with empty topic, skipping")
- // skip empty topics
- continue
- }
-
- tmp := &pubsub.Message{
- Topic: in.GetMessages()[i].GetTopics()[j],
- Payload: in.GetMessages()[i].GetPayload(),
- }
-
- r.plugin.PublishAsync(tmp)
- }
- }
-
- out.Ok = true
- return nil
-}