summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
committerValery Piashchynski <[email protected]>2021-06-20 16:40:14 +0300
commit2dd30155de6faaf6005027d5337a840310c827f9 (patch)
treeaa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/broadcast
parent25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff)
- Update redis/memory pubsubs
- Rework internal message bus - Add new tests for the broadcast plugin and include them into the GA Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/plugin.go195
-rw-r--r--plugins/broadcast/rpc.go51
2 files changed, 116 insertions, 130 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index 3b420a4b..04a4fb80 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -4,13 +4,12 @@ import (
"fmt"
"sync"
+ "github.com/google/uuid"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
- websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
const (
@@ -55,78 +54,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
func (p *Plugin) Serve() chan error {
- const op = errors.Op("broadcast_plugin_serve")
- errCh := make(chan error, 1)
-
- // iterate over config
- for k, v := range p.cfg.Data {
- if v == nil {
- continue
- }
-
- // check type of the v
- // should be a map[string]interface{}
- switch t := v.(type) {
- // correct type
- case map[string]interface{}:
- if _, ok := t[driver]; !ok {
- errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
- return errCh
- }
- default:
- errCh <- errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
- return errCh
- }
-
- // config key for the particular sub-driver kv.memcached
- configKey := fmt.Sprintf("%s.%s", PluginName, k)
-
- switch v.(map[string]interface{})[driver] {
- case memory:
- if _, ok := p.constructors[memory]; !ok {
- p.log.Warn("no memory drivers registered", "registered", p.publishers)
- continue
- }
- ps, err := p.constructors[memory].PSConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the pubsub
- p.publishers[k] = ps
- case redis:
- if _, ok := p.constructors[redis]; !ok {
- p.log.Warn("no redis drivers registered", "registered", p.publishers)
- continue
- }
-
- // first - try local configuration
- switch {
- case p.cfgPlugin.Has(configKey):
- ps, err := p.constructors[redis].PSConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the pubsub
- p.publishers[k] = ps
- case p.cfgPlugin.Has(redis):
- ps, err := p.constructors[redis].PSConstruct(configKey)
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
-
- // save the pubsub
- p.publishers[k] = ps
- continue
- }
- }
- }
-
- return errCh
+ return make(chan error)
}
func (p *Plugin) Stop() error {
@@ -140,61 +68,49 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) {
+func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) {
// key redis, value - interface
- p.constructors[name.Name()] = subscriber
+ p.constructors[name.Name()] = constructor
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
+func (p *Plugin) Publish(m *pubsub.Message) error {
p.Lock()
defer p.Unlock()
const op = errors.Op("broadcast_plugin_publish")
- msg := &websocketsv1beta.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return errors.E(op, err)
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if len(p.publishers) > 0 {
- for j := range p.publishers {
- err = p.publishers[j].Publish(m)
- if err != nil {
- return errors.E(op, err)
- }
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ return errors.E(op, err)
}
-
- return nil
}
-
+ return nil
+ } else {
p.log.Warn("no publishers registered")
}
return nil
}
-func (p *Plugin) PublishAsync(m []byte) {
+func (p *Plugin) PublishAsync(m *pubsub.Message) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &websocketsv1beta.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if len(p.publishers) > 0 {
- for j := range p.publishers {
- p.publishers[j].PublishAsync(m)
+ // check if any publisher registered
+ if len(p.publishers) > 0 {
+ for j := range p.publishers {
+ err := p.publishers[j].Publish(m)
+ if err != nil {
+ p.log.Error("publishAsync", "error", err)
+ // continue publish to other registered publishers
+ continue
}
- return
}
+ } else {
p.log.Warn("no publishers registered")
}
}()
@@ -202,10 +118,67 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
- // key - driver, default for example
- // we should find `default` in the collected pubsubs constructors
- if pub, ok := p.publishers[key]; ok {
- return pub, nil
+
+ // choose a driver
+ if val, ok := p.cfg.Data[key]; ok {
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := val.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val)))
+ }
+ default:
+ return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation"))
+ }
+
+ // config key for the particular sub-driver kv.memcached
+ configKey := fmt.Sprintf("%s.%s", PluginName, key)
+
+ switch val.(map[string]interface{})[driver] {
+ case memory:
+ if _, ok := p.constructors[memory]; !ok {
+ return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers))
+ }
+ ps, err := p.constructors[memory].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the initialized publisher channel
+ // for the in-memory, register new publishers
+ p.publishers[uuid.NewString()] = ps
+
+ return ps, nil
+ case redis:
+ if _, ok := p.constructors[redis]; !ok {
+ return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers))
+ }
+
+ // first - try local configuration
+ switch {
+ case p.cfgPlugin.Has(configKey):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the pubsub under a config key
+ //
+ p.publishers[configKey] = ps
+ return ps, nil
+ case p.cfgPlugin.Has(redis):
+ ps, err := p.constructors[redis].PSConstruct(configKey)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+
+ // save the pubsub
+ p.publishers[configKey] = ps
+ return ps, nil
+ }
+ }
}
return nil, errors.E(op, errors.Str("could not find driver by provided key"))
}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index 4c27cdc3..2ee211f8 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,9 +2,9 @@ package broadcast
import (
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
- "google.golang.org/protobuf/proto"
)
// rpc collectors struct
@@ -14,7 +14,7 @@ type rpc struct {
}
// Publish ... msg is a proto decoded payload
-// see: pkg/pubsub/message.fbs
+// see: root/proto
func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error {
const op = errors.Op("broadcast_publish")
@@ -28,15 +28,23 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- return errors.E(op, err)
- }
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
- err = r.plugin.Publish(bb)
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
+ err := r.plugin.Publish(tmp)
+ if err != nil {
+ out.Ok = false
+ return errors.E(op, err)
+ }
}
}
@@ -45,10 +53,8 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
}
// PublishAsync ...
-// see: pkg/pubsub/message.fbs
+// see: root/proto
func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error {
- const op = errors.Op("publish_async")
-
// just return in case of nil message
if in == nil {
out.Ok = false
@@ -60,13 +66,20 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response)
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
- bb, err := proto.Marshal(in.GetMessages()[i])
- if err != nil {
- out.Ok = false
- return errors.E(op, err)
- }
+ for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ {
+ if in.GetMessages()[i].GetTopics()[j] == "" {
+ r.log.Warn("message with empty topic, skipping")
+ // skip empty topics
+ continue
+ }
+
+ tmp := &pubsub.Message{
+ Topic: in.GetMessages()[i].GetTopics()[j],
+ Payload: in.GetMessages()[i].GetPayload(),
+ }
- r.plugin.PublishAsync(bb)
+ r.plugin.PublishAsync(tmp)
+ }
}
out.Ok = true