summaryrefslogtreecommitdiff
path: root/plugins/broadcast/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
committerValery Piashchynski <[email protected]>2021-06-18 01:06:16 +0300
commitfe7bb0fe758d573fe353df028257ed66c6eccf66 (patch)
tree74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 /plugins/broadcast/plugin.go
parent68ff941c4226074206ceed9c30bd95317aa0e9fc (diff)
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast/plugin.go')
-rw-r--r--plugins/broadcast/plugin.go135
1 files changed, 119 insertions, 16 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3b771746..c43b2e4c 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -1,25 +1,37 @@
package broadcast
import (
+ "fmt"
"sync"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
"google.golang.org/protobuf/proto"
)
-const PluginName string = "broadcast"
+const (
+ PluginName string = "broadcast"
+ // driver is the mandatory field which should present in every storage
+ driver string = "driver"
+
+ redis string = "redis"
+ memory string = "memory"
+)
type Plugin struct {
sync.RWMutex
- log logger.Logger
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// publishers implement Publisher interface
// and able to receive a payload
- publishers map[string]pubsub.Publisher
+ publishers map[string]pubsub.PubSub
+ providers map[string]pubsub.PSProvider
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
@@ -27,9 +39,95 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
}
+ p.cfg = &Config{}
+ // unmarshal config section
+ err := cfg.UnmarshalKey(PluginName, &p.cfg.Data)
+ if err != nil {
+ return errors.E(op, err)
+ }
+
+ p.publishers = make(map[string]pubsub.PubSub)
+ p.providers = make(map[string]pubsub.PSProvider)
- p.publishers = make(map[string]pubsub.Publisher)
p.log = log
+ p.cfgPlugin = cfg
+ return nil
+}
+
+func (p *Plugin) Serve() chan error {
+ const op = errors.Op("broadcast_plugin_serve")
+ errCh := make(chan error, 1)
+
+ // iterate over config
+ for k, v := range p.cfg.Data {
+ if v == nil {
+ continue
+ }
+
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, k)
+
+ switch v.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.providers[memory]; !ok {
+ p.log.Warn("no memory drivers registered", "registered", p.publishers)
+ continue
+ }
+ ps, err := p.providers[memory].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case redis:
+ if _, ok := p.providers[redis]; !ok {
+ p.log.Warn("no redis drivers registered", "registered", p.publishers)
+ continue
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.providers[redis].PSProvide(configKey)
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
+
+ // save the pubsub
+ p.publishers[k] = ps
+ continue
+ }
+ }
+ }
+
+ return errCh
+}
+
+func (p *Plugin) Stop() error {
return nil
}
@@ -40,8 +138,9 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Publisher) {
- p.publishers[name.Name()] = subscriber
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) {
+ // key redis, value - interface
+ p.providers[name.Name()] = subscriber
}
// Publish is an entry point to the websocket PUBSUB
@@ -88,21 +187,25 @@ func (p *Plugin) PublishAsync(m []byte) {
// Get payload
for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.publishers[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ p.publishers[j].PublishAsync(m)
}
- } else {
- p.log.Warn("no such broker", "available", p.publishers, "requested", msg.GetBroker())
+ return
}
+ p.log.Warn("no publishers registered")
}
}()
}
-func (p *Plugin) GetDriver(key string) pubsub.SubReader {
- println(key)
- return nil
+func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
+ const op = errors.Op("broadcast_plugin_get_driver")
+ // key - driver, default for example
+ // we should find `default` in the collected pubsubs providers
+ if pub, ok := p.publishers[key]; ok {
+ return pub, nil
+ }
+ return nil, errors.E(op, errors.Str("could not find driver by provided key"))
}
func (p *Plugin) RPC() interface{} {