summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
committerValery Piashchynski <[email protected]>2021-06-17 19:24:35 +0300
commit68ff941c4226074206ceed9c30bd95317aa0e9fc (patch)
tree693306256281cccefb29f4eedb7f617a9022154e /plugins/websockets/plugin.go
parent25e0841c6aa5e2686da5b9f74e3d77d3814ff592 (diff)
- Initial broadcast commit
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go194
1 files changed, 70 insertions, 124 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 8b708187..cf861c72 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,7 +2,6 @@ package websockets
import (
"context"
- "fmt"
"net/http"
"sync"
"time"
@@ -12,10 +11,10 @@ import (
json "github.com/json-iterator/go"
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
@@ -26,7 +25,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "google.golang.org/protobuf/proto"
)
const (
@@ -36,9 +34,11 @@ const (
type Plugin struct {
sync.RWMutex
// Collection with all available pubsubs
- pubsubs map[string]pubsub.PubSub
+ //pubsubs map[string]pubsub.PubSub
- psProviders map[string]pubsub.PSProvider
+ //psProviders map[string]pubsub.PSProvider
+
+ subReaders map[string]pubsub.SubReader
cfg *Config
cfgPlugin config.Configurer
@@ -60,7 +60,7 @@ type Plugin struct {
accessValidator validator.AccessValidatorFn
}
-func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server) error {
+func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Server, b broadcast.Broadcaster) error {
const op = errors.Op("websockets_plugin_init")
if !cfg.Has(PluginName) {
return errors.E(op, errors.Disabled)
@@ -72,8 +72,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
- p.pubsubs = make(map[string]pubsub.PubSub)
- p.psProviders = make(map[string]pubsub.PSProvider)
+ //p.pubsubs = make(map[string]pubsub.PubSub)
+ //p.psProviders = make(map[string]pubsub.PSProvider)
+
+ p.subReaders = make(map[string]pubsub.SubReader)
p.log = log
p.cfgPlugin = cfg
@@ -96,11 +98,11 @@ func (p *Plugin) Serve() chan error {
errCh := make(chan error, 1)
const op = errors.Op("websockets_plugin_serve")
- err := p.initPubSubs()
- if err != nil {
- errCh <- errors.E(op, err)
- return errCh
- }
+ //err := p.initPubSubs()
+ //if err != nil {
+ // errCh <- errors.E(op, err)
+ // return errCh
+ //}
go func() {
var err error
@@ -122,11 +124,11 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
- p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+ p.workersPool = pool.NewWorkersPool(p.subReaders, &p.connections, p.log)
// run all pubsubs drivers
- for _, v := range p.pubsubs {
- go func(ps pubsub.PubSub) {
+ for _, v := range p.subReaders {
+ go func(ps pubsub.SubReader) {
for {
select {
case <-p.serveExit:
@@ -146,53 +148,53 @@ func (p *Plugin) Serve() chan error {
return errCh
}
-func (p *Plugin) initPubSubs() error {
- for i := 0; i < len(p.cfg.PubSubs); i++ {
- // don't need to have a section for the in-memory
- if p.cfg.PubSubs[i] == "memory" {
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- r, err := provider.PSProvide("")
- if err != nil {
- return err
- }
-
- // append default in-memory provider
- p.pubsubs["memory"] = r
- }
- continue
- }
- // key - memory, redis
- if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
- // try local key
- switch {
- // try local config first
- case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
- r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
- r, err := provider.PSProvide(p.cfg.PubSubs[i])
- if err != nil {
- return err
- }
-
- // append redis provider
- p.pubsubs[p.cfg.PubSubs[i]] = r
- default:
- return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
- }
- } else {
- // no such driver
- p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
- }
- }
-
- return nil
-}
+//func (p *Plugin) initPubSubs() error {
+// for i := 0; i < len(p.cfg.PubSubs); i++ {
+// // don't need to have a section for the in-memory
+// if p.cfg.PubSubs[i] == "memory" {
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// r, err := provider.PSProvide("")
+// if err != nil {
+// return err
+// }
+//
+// // append default in-memory provider
+// p.pubsubs["memory"] = r
+// }
+// continue
+// }
+// // key - memory, redis
+// if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+// // try local key
+// switch {
+// // try local config first
+// case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+// r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+// r, err := provider.PSProvide(p.cfg.PubSubs[i])
+// if err != nil {
+// return err
+// }
+//
+// // append redis provider
+// p.pubsubs[p.cfg.PubSubs[i]] = r
+// default:
+// return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+// }
+// } else {
+// // no such driver
+// p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+// }
+// }
+//
+// return nil
+//}
func (p *Plugin) Stop() error {
// close workers pool
@@ -210,26 +212,19 @@ func (p *Plugin) Stop() error {
func (p *Plugin) Collects() []interface{} {
return []interface{}{
- p.GetPublishers,
+ p.GetSubsReader,
}
}
func (p *Plugin) Available() {}
-func (p *Plugin) RPC() interface{} {
- return &rpc{
- plugin: p,
- log: p.log,
- }
-}
-
func (p *Plugin) Name() string {
return PluginName
}
-// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
- p.psProviders[name.Name()] = pub
+// GetSubsReader collects all plugins which implement SubReader interface
+func (p *Plugin) GetSubsReader(name endure.Named, pub pubsub.SubReader) {
+ p.subReaders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -277,7 +272,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.connections.Store(connectionID, safeConn)
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, nil, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
err = e.StartCommandLoop()
@@ -361,55 +356,6 @@ func (p *Plugin) Reset() error {
return nil
}
-// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(m []byte) error {
- p.Lock()
- defer p.Unlock()
-
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- return err
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- return nil
-}
-
-func (p *Plugin) PublishAsync(m []byte) {
- go func() {
- p.Lock()
- defer p.Unlock()
- msg := &websocketsv1.Message{}
- err := proto.Unmarshal(m, msg)
- if err != nil {
- p.log.Error("message unmarshal")
- }
-
- // Get payload
- for i := 0; i < len(msg.GetTopics()); i++ {
- if br, ok := p.pubsubs[msg.GetBroker()]; ok {
- err := br.Publish(m)
- if err != nil {
- p.log.Error("publish async error", "error", err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
- }
- }
- }()
-}
-
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
const op = errors.Op("access_validator")