diff options
author | Valery Piashchynski <[email protected]> | 2021-06-14 16:39:02 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-14 16:39:02 +0300 |
commit | 75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd (patch) | |
tree | 1e9a910071d20021ad0f7ef4fe6099bac6a341ef /plugins/websockets/plugin.go | |
parent | dc8ed203c247afd684f198ebbac103a10bfad72a (diff) |
- Rework redis with ws plugins
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 77 |
1 files changed, 67 insertions, 10 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 6ddd609c..c53491b4 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -2,6 +2,7 @@ package websockets import ( "context" + "fmt" "net/http" "sync" "time" @@ -23,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/server" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" - "github.com/spiral/roadrunner/v2/plugins/websockets/memory" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "google.golang.org/protobuf/proto" @@ -38,8 +38,11 @@ type Plugin struct { // Collection with all available pubsubs pubsubs map[string]pubsub.PubSub - cfg *Config - log logger.Logger + psProviders map[string]pubsub.PSProvider + + cfg *Config + cfgPlugin config.Configurer + log logger.Logger // global connections map connections sync.Map @@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se } p.cfg.InitDefault() - p.pubsubs = make(map[string]pubsub.PubSub) + p.psProviders = make(map[string]pubsub.PSProvider) + p.log = log + p.cfgPlugin = cfg + p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, ReadBufferSize: 1024, @@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.serveExit = make(chan struct{}) p.server = server - // attach default driver - p.pubsubs["memory"] = memory.NewInMemory(p.log) - return nil } func (p *Plugin) Serve() chan error { errCh := make(chan error) + const op = errors.Op("websockets_plugin_serve") + + err := p.initPubSubs() + if err != nil { + errCh <- errors.E(op, err) + return errCh + } go func() { var err error @@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error { return errCh } +func (p *Plugin) initPubSubs() error { + for i := 0; i < len(p.cfg.PubSubs); i++ { + // don't need to have a section for the in-memory + if p.cfg.PubSubs[i] == "memory" { + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + r, err := provider.PSProvide("") + if err != nil { + return err + } + + // append default in-memory provider + p.pubsubs["memory"] = r + } + continue + } + // key - memory, redis + if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok { + // try local key + switch { + // try local config first + case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])): + r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + case p.cfgPlugin.Has(p.cfg.PubSubs[i]): + r, err := provider.PSProvide(p.cfg.PubSubs[i]) + if err != nil { + return err + } + + // append redis provider + p.pubsubs[p.cfg.PubSubs[i]] = r + default: + return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i]) + } + } else { + // no such driver + p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders) + } + } + + return nil +} + func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() @@ -167,8 +225,8 @@ func (p *Plugin) Name() string { } // GetPublishers collects all pubsubs -func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) { - p.pubsubs[name.Name()] = pub +func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) { + p.psProviders[name.Name()] = pub } func (p *Plugin) Middleware(next http.Handler) http.Handler { @@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid } } -// go:inline func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) { const op = errors.Op("exec") pd := payload.Payload{ |