summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
committerValery Piashchynski <[email protected]>2021-06-14 16:39:02 +0300
commit75ab1e16c64cfd0a6424fe4c546fdbc5e1b992dd (patch)
tree1e9a910071d20021ad0f7ef4fe6099bac6a341ef /plugins/websockets/plugin.go
parentdc8ed203c247afd684f198ebbac103a10bfad72a (diff)
- Rework redis with ws plugins
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go77
1 files changed, 67 insertions, 10 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 6ddd609c..c53491b4 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -2,6 +2,7 @@ package websockets
import (
"context"
+ "fmt"
"net/http"
"sync"
"time"
@@ -23,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
- "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"google.golang.org/protobuf/proto"
@@ -38,8 +38,11 @@ type Plugin struct {
// Collection with all available pubsubs
pubsubs map[string]pubsub.PubSub
- cfg *Config
- log logger.Logger
+ psProviders map[string]pubsub.PSProvider
+
+ cfg *Config
+ cfgPlugin config.Configurer
+ log logger.Logger
// global connections map
connections sync.Map
@@ -69,9 +72,12 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
}
p.cfg.InitDefault()
-
p.pubsubs = make(map[string]pubsub.PubSub)
+ p.psProviders = make(map[string]pubsub.PSProvider)
+
p.log = log
+ p.cfgPlugin = cfg
+
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
ReadBufferSize: 1024,
@@ -80,14 +86,18 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
- // attach default driver
- p.pubsubs["memory"] = memory.NewInMemory(p.log)
-
return nil
}
func (p *Plugin) Serve() chan error {
errCh := make(chan error)
+ const op = errors.Op("websockets_plugin_serve")
+
+ err := p.initPubSubs()
+ if err != nil {
+ errCh <- errors.E(op, err)
+ return errCh
+ }
go func() {
var err error
@@ -133,6 +143,54 @@ func (p *Plugin) Serve() chan error {
return errCh
}
+func (p *Plugin) initPubSubs() error {
+ for i := 0; i < len(p.cfg.PubSubs); i++ {
+ // don't need to have a section for the in-memory
+ if p.cfg.PubSubs[i] == "memory" {
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ r, err := provider.PSProvide("")
+ if err != nil {
+ return err
+ }
+
+ // append default in-memory provider
+ p.pubsubs["memory"] = r
+ }
+ continue
+ }
+ // key - memory, redis
+ if provider, ok := p.psProviders[p.cfg.PubSubs[i]]; ok {
+ // try local key
+ switch {
+ // try local config first
+ case p.cfgPlugin.Has(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i])):
+ r, err := provider.PSProvide(fmt.Sprintf("%s.%s", PluginName, p.cfg.PubSubs[i]))
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ case p.cfgPlugin.Has(p.cfg.PubSubs[i]):
+ r, err := provider.PSProvide(p.cfg.PubSubs[i])
+ if err != nil {
+ return err
+ }
+
+ // append redis provider
+ p.pubsubs[p.cfg.PubSubs[i]] = r
+ default:
+ return errors.Errorf("could not find configuration sections for the %s", p.cfg.PubSubs[i])
+ }
+ } else {
+ // no such driver
+ p.log.Warn("no such driver", "requested", p.cfg.PubSubs[i], "available", p.psProviders)
+ }
+ }
+
+ return nil
+}
+
func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
@@ -167,8 +225,8 @@ func (p *Plugin) Name() string {
}
// GetPublishers collects all pubsubs
-func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PubSub) {
- p.pubsubs[name.Name()] = pub
+func (p *Plugin) GetPublishers(name endure.Named, pub pubsub.PSProvider) {
+ p.psProviders[name.Name()] = pub
}
func (p *Plugin) Middleware(next http.Handler) http.Handler {
@@ -389,7 +447,6 @@ func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValid
}
}
-// go:inline
func exec(ctx []byte, pool phpPool.Pool) (*validator.AccessValidator, error) {
const op = errors.Op("exec")
pd := payload.Payload{