diff options
author | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
commit | 2dd30155de6faaf6005027d5337a840310c827f9 (patch) | |
tree | aa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/broadcast | |
parent | 25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff) |
- Update redis/memory pubsubs
- Rework internal message bus
- Add new tests for the broadcast plugin and include them into the GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/plugin.go | 195 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go | 51 |
2 files changed, 116 insertions, 130 deletions
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index 3b420a4b..04a4fb80 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -4,13 +4,12 @@ import ( "fmt" "sync" + "github.com/google/uuid" endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" - websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" ) const ( @@ -55,78 +54,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } func (p *Plugin) Serve() chan error { - const op = errors.Op("broadcast_plugin_serve") - errCh := make(chan error, 1) - - // iterate over config - for k, v := range p.cfg.Data { - if v == nil { - continue - } - - // check type of the v - // should be a map[string]interface{} - switch t := v.(type) { - // correct type - case map[string]interface{}: - if _, ok := t[driver]; !ok { - errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) - return errCh - } - default: - errCh <- errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) - return errCh - } - - // config key for the particular sub-driver kv.memcached - configKey := fmt.Sprintf("%s.%s", PluginName, k) - - switch v.(map[string]interface{})[driver] { - case memory: - if _, ok := p.constructors[memory]; !ok { - p.log.Warn("no memory drivers registered", "registered", p.publishers) - continue - } - ps, err := p.constructors[memory].PSConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the pubsub - p.publishers[k] = ps - case redis: - if _, ok := p.constructors[redis]; !ok { - p.log.Warn("no redis drivers registered", "registered", p.publishers) - continue - } - - // first - try local configuration - switch { - case p.cfgPlugin.Has(configKey): - ps, err := p.constructors[redis].PSConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the pubsub - p.publishers[k] = ps - case p.cfgPlugin.Has(redis): - ps, err := p.constructors[redis].PSConstruct(configKey) - if err != nil { - errCh <- errors.E(op, err) - return errCh - } - - // save the pubsub - p.publishers[k] = ps - continue - } - } - } - - return errCh + return make(chan error) } func (p *Plugin) Stop() error { @@ -140,61 +68,49 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) { +func (p *Plugin) CollectPublishers(name endure.Named, constructor pubsub.Constructor) { // key redis, value - interface - p.constructors[name.Name()] = subscriber + p.constructors[name.Name()] = constructor } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(m []byte) error { +func (p *Plugin) Publish(m *pubsub.Message) error { p.Lock() defer p.Unlock() const op = errors.Op("broadcast_plugin_publish") - msg := &websocketsv1beta.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - return errors.E(op, err) - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if len(p.publishers) > 0 { - for j := range p.publishers { - err = p.publishers[j].Publish(m) - if err != nil { - return errors.E(op, err) - } + // check if any publisher registered + if len(p.publishers) > 0 { + for j := range p.publishers { + err := p.publishers[j].Publish(m) + if err != nil { + return errors.E(op, err) } - - return nil } - + return nil + } else { p.log.Warn("no publishers registered") } return nil } -func (p *Plugin) PublishAsync(m []byte) { +func (p *Plugin) PublishAsync(m *pubsub.Message) { go func() { p.Lock() defer p.Unlock() - msg := &websocketsv1beta.Message{} - err := proto.Unmarshal(m, msg) - if err != nil { - p.log.Error("message unmarshal") - } - - // Get payload - for i := 0; i < len(msg.GetTopics()); i++ { - if len(p.publishers) > 0 { - for j := range p.publishers { - p.publishers[j].PublishAsync(m) + // check if any publisher registered + if len(p.publishers) > 0 { + for j := range p.publishers { + err := p.publishers[j].Publish(m) + if err != nil { + p.log.Error("publishAsync", "error", err) + // continue publish to other registered publishers + continue } - return } + } else { p.log.Warn("no publishers registered") } }() @@ -202,10 +118,67 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") - // key - driver, default for example - // we should find `default` in the collected pubsubs constructors - if pub, ok := p.publishers[key]; ok { - return pub, nil + + // choose a driver + if val, ok := p.cfg.Data[key]; ok { + // check type of the v + // should be a map[string]interface{} + switch t := val.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + panic(errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", val))) + } + default: + return nil, errors.E(op, errors.Str("wrong type detected in the configuration, please, check yaml indentation")) + } + + // config key for the particular sub-driver kv.memcached + configKey := fmt.Sprintf("%s.%s", PluginName, key) + + switch val.(map[string]interface{})[driver] { + case memory: + if _, ok := p.constructors[memory]; !ok { + return nil, errors.E(op, errors.Errorf("no memory drivers registered, registered: %s", p.publishers)) + } + ps, err := p.constructors[memory].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // save the initialized publisher channel + // for the in-memory, register new publishers + p.publishers[uuid.NewString()] = ps + + return ps, nil + case redis: + if _, ok := p.constructors[redis]; !ok { + return nil, errors.E(op, errors.Errorf("no redis drivers registered, registered: %s", p.publishers)) + } + + // first - try local configuration + switch { + case p.cfgPlugin.Has(configKey): + ps, err := p.constructors[redis].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // save the pubsub under a config key + // + p.publishers[configKey] = ps + return ps, nil + case p.cfgPlugin.Has(redis): + ps, err := p.constructors[redis].PSConstruct(configKey) + if err != nil { + return nil, errors.E(op, err) + } + + // save the pubsub + p.publishers[configKey] = ps + return ps, nil + } + } } return nil, errors.E(op, errors.Str("could not find driver by provided key")) } diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index 4c27cdc3..2ee211f8 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,9 +2,9 @@ package broadcast import ( "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" - "google.golang.org/protobuf/proto" ) // rpc collectors struct @@ -14,7 +14,7 @@ type rpc struct { } // Publish ... msg is a proto decoded payload -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) error { const op = errors.Op("broadcast_publish") @@ -28,15 +28,23 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - err = r.plugin.Publish(bb) - if err != nil { - out.Ok = false - return errors.E(op, err) + err := r.plugin.Publish(tmp) + if err != nil { + out.Ok = false + return errors.E(op, err) + } } } @@ -45,10 +53,8 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro } // PublishAsync ... -// see: pkg/pubsub/message.fbs +// see: root/proto func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) error { - const op = errors.Op("publish_async") - // just return in case of nil message if in == nil { out.Ok = false @@ -60,13 +66,20 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { - bb, err := proto.Marshal(in.GetMessages()[i]) - if err != nil { - out.Ok = false - return errors.E(op, err) - } + for j := 0; j < len(in.GetMessages()[i].GetTopics()); j++ { + if in.GetMessages()[i].GetTopics()[j] == "" { + r.log.Warn("message with empty topic, skipping") + // skip empty topics + continue + } + + tmp := &pubsub.Message{ + Topic: in.GetMessages()[i].GetTopics()[j], + Payload: in.GetMessages()[i].GetPayload(), + } - r.plugin.PublishAsync(bb) + r.plugin.PublishAsync(tmp) + } } out.Ok = true |