summaryrefslogtreecommitdiff
path: root/plugins/broadcast
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 12:00:05 +0300
committerValery Piashchynski <[email protected]>2021-06-18 12:00:05 +0300
commit9e8bad3988c1fec2e545898d529446f7b93e537b (patch)
treed91159b8c78c8add1981641499ef81c821d5d363 /plugins/broadcast
parentfe7bb0fe758d573fe353df028257ed66c6eccf66 (diff)
- Rework finished
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/broadcast')
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go32
-rw-r--r--plugins/broadcast/rpc.go7
3 files changed, 27 insertions, 19 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..46709d71
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) (pubsub.SubReader, error)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index c43b2e4c..612b6a47 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -6,10 +6,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
@@ -30,8 +30,8 @@ type Plugin struct {
log logger.Logger
// publishers implement Publisher interface
// and able to receive a payload
- publishers map[string]pubsub.PubSub
- providers map[string]pubsub.PSProvider
+ publishers map[string]pubsub.PubSub
+ constructors map[string]pubsub.Constructor
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
@@ -47,7 +47,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
p.publishers = make(map[string]pubsub.PubSub)
- p.providers = make(map[string]pubsub.PSProvider)
+ p.constructors = make(map[string]pubsub.Constructor)
p.log = log
p.cfgPlugin = cfg
@@ -64,6 +64,8 @@ func (p *Plugin) Serve() chan error {
continue
}
+ // check type of the v
+ // should be a map[string]interface{}
switch t := v.(type) {
// correct type
case map[string]interface{}:
@@ -81,11 +83,11 @@ func (p *Plugin) Serve() chan error {
switch v.(map[string]interface{})[driver] {
case memory:
- if _, ok := p.providers[memory]; !ok {
+ if _, ok := p.constructors[memory]; !ok {
p.log.Warn("no memory drivers registered", "registered", p.publishers)
continue
}
- ps, err := p.providers[memory].PSProvide(configKey)
+ ps, err := p.constructors[memory].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -94,7 +96,7 @@ func (p *Plugin) Serve() chan error {
// save the pubsub
p.publishers[k] = ps
case redis:
- if _, ok := p.providers[redis]; !ok {
+ if _, ok := p.constructors[redis]; !ok {
p.log.Warn("no redis drivers registered", "registered", p.publishers)
continue
}
@@ -102,7 +104,7 @@ func (p *Plugin) Serve() chan error {
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
- ps, err := p.providers[redis].PSProvide(configKey)
+ ps, err := p.constructors[redis].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -111,7 +113,7 @@ func (p *Plugin) Serve() chan error {
// save the pubsub
p.publishers[k] = ps
case p.cfgPlugin.Has(redis):
- ps, err := p.providers[redis].PSProvide(configKey)
+ ps, err := p.constructors[redis].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -138,9 +140,9 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) {
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) {
// key redis, value - interface
- p.providers[name.Name()] = subscriber
+ p.constructors[name.Name()] = subscriber
}
// Publish is an entry point to the websocket PUBSUB
@@ -150,7 +152,7 @@ func (p *Plugin) Publish(m []byte) error {
const op = errors.Op("broadcast_plugin_publish")
- msg := &websocketsv1.Message{}
+ msg := &websocketsv1beta.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return errors.E(op, err)
@@ -179,7 +181,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &websocketsv1.Message{}
+ msg := &websocketsv1beta.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")
@@ -201,7 +203,7 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// key - driver, default for example
- // we should find `default` in the collected pubsubs providers
+ // we should find `default` in the collected pubsubs constructors
if pub, ok := p.publishers[key]; ok {
return pub, nil
}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index fa853421..4c27cdc3 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,8 +2,8 @@ package broadcast
import (
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
@@ -24,8 +24,7 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
return nil
}
- r.log.Debug("message published", "msg", in.Messages)
-
+ r.log.Debug("message published", "msg", in.String())
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
@@ -56,7 +55,7 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response)
return nil
}
- r.log.Debug("message published", "msg", in.Messages)
+ r.log.Debug("message published", "msg", in.GetMessages())
msgLen := len(in.GetMessages())