diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 12:00:05 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 12:00:05 +0300 |
commit | 9e8bad3988c1fec2e545898d529446f7b93e537b (patch) | |
tree | d91159b8c78c8add1981641499ef81c821d5d363 /plugins/broadcast | |
parent | fe7bb0fe758d573fe353df028257ed66c6eccf66 (diff) |
- Rework finished
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r-- | plugins/broadcast/interface.go | 7 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 32 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go | 7 |
3 files changed, 27 insertions, 19 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go new file mode 100644 index 00000000..46709d71 --- /dev/null +++ b/plugins/broadcast/interface.go @@ -0,0 +1,7 @@ +package broadcast + +import "github.com/spiral/roadrunner/v2/pkg/pubsub" + +type Broadcaster interface { + GetDriver(key string) (pubsub.SubReader, error) +} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index c43b2e4c..612b6a47 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -6,10 +6,10 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) @@ -30,8 +30,8 @@ type Plugin struct { log logger.Logger // publishers implement Publisher interface // and able to receive a payload - publishers map[string]pubsub.PubSub - providers map[string]pubsub.PSProvider + publishers map[string]pubsub.PubSub + constructors map[string]pubsub.Constructor } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -47,7 +47,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } p.publishers = make(map[string]pubsub.PubSub) - p.providers = make(map[string]pubsub.PSProvider) + p.constructors = make(map[string]pubsub.Constructor) p.log = log p.cfgPlugin = cfg @@ -64,6 +64,8 @@ func (p *Plugin) Serve() chan error { continue } + // check type of the v + // should be a map[string]interface{} switch t := v.(type) { // correct type case map[string]interface{}: @@ -81,11 +83,11 @@ func (p *Plugin) Serve() chan error { switch v.(map[string]interface{})[driver] { case memory: - if _, ok := p.providers[memory]; !ok { + if _, ok := p.constructors[memory]; !ok { p.log.Warn("no memory drivers registered", "registered", p.publishers) continue } - ps, err := p.providers[memory].PSProvide(configKey) + ps, err := p.constructors[memory].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -94,7 +96,7 @@ func (p *Plugin) Serve() chan error { // save the pubsub p.publishers[k] = ps case redis: - if _, ok := p.providers[redis]; !ok { + if _, ok := p.constructors[redis]; !ok { p.log.Warn("no redis drivers registered", "registered", p.publishers) continue } @@ -102,7 +104,7 @@ func (p *Plugin) Serve() chan error { // first - try local configuration switch { case p.cfgPlugin.Has(configKey): - ps, err := p.providers[redis].PSProvide(configKey) + ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -111,7 +113,7 @@ func (p *Plugin) Serve() chan error { // save the pubsub p.publishers[k] = ps case p.cfgPlugin.Has(redis): - ps, err := p.providers[redis].PSProvide(configKey) + ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -138,9 +140,9 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) { +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) { // key redis, value - interface - p.providers[name.Name()] = subscriber + p.constructors[name.Name()] = subscriber } // Publish is an entry point to the websocket PUBSUB @@ -150,7 +152,7 @@ func (p *Plugin) Publish(m []byte) error { const op = errors.Op("broadcast_plugin_publish") - msg := &websocketsv1.Message{} + msg := &websocketsv1beta.Message{} err := proto.Unmarshal(m, msg) if err != nil { return errors.E(op, err) @@ -179,7 +181,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &websocketsv1.Message{} + msg := &websocketsv1beta.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") @@ -201,7 +203,7 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // key - driver, default for example - // we should find `default` in the collected pubsubs providers + // we should find `default` in the collected pubsubs constructors if pub, ok := p.publishers[key]; ok { return pub, nil } diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index fa853421..4c27cdc3 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,8 +2,8 @@ package broadcast import ( "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) @@ -24,8 +24,7 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro return nil } - r.log.Debug("message published", "msg", in.Messages) - + r.log.Debug("message published", "msg", in.String()) msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { @@ -56,7 +55,7 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) return nil } - r.log.Debug("message published", "msg", in.Messages) + r.log.Debug("message published", "msg", in.GetMessages()) msgLen := len(in.GetMessages()) |