summaryrefslogtreecommitdiff
path: root/plugins
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 12:00:05 +0300
committerValery Piashchynski <[email protected]>2021-06-18 12:00:05 +0300
commit9e8bad3988c1fec2e545898d529446f7b93e537b (patch)
treed91159b8c78c8add1981641499ef81c821d5d363 /plugins
parentfe7bb0fe758d573fe353df028257ed66c6eccf66 (diff)
- Rework finished
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r--plugins/broadcast/interface.go7
-rw-r--r--plugins/broadcast/plugin.go32
-rw-r--r--plugins/broadcast/rpc.go7
-rw-r--r--plugins/kv/config.go2
-rw-r--r--plugins/kv/drivers/boltdb/driver.go2
-rw-r--r--plugins/kv/drivers/boltdb/plugin.go2
-rw-r--r--plugins/kv/drivers/memcached/driver.go2
-rw-r--r--plugins/kv/drivers/memcached/plugin.go2
-rw-r--r--plugins/kv/interface.go15
-rw-r--r--plugins/kv/plugin.go55
-rw-r--r--plugins/kv/rpc.go2
-rw-r--r--plugins/memory/kv.go2
-rw-r--r--plugins/memory/plugin.go6
-rw-r--r--plugins/memory/pubsub.go4
-rw-r--r--plugins/redis/fanin.go2
-rw-r--r--plugins/redis/kv.go2
-rw-r--r--plugins/redis/plugin.go8
-rw-r--r--plugins/redis/pubsub.go4
-rw-r--r--plugins/websockets/executor/executor.go4
-rw-r--r--plugins/websockets/origin_test.go3
-rw-r--r--plugins/websockets/plugin.go4
-rw-r--r--plugins/websockets/pool/workers_pool.go10
22 files changed, 96 insertions, 81 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go
new file mode 100644
index 00000000..46709d71
--- /dev/null
+++ b/plugins/broadcast/interface.go
@@ -0,0 +1,7 @@
+package broadcast
+
+import "github.com/spiral/roadrunner/v2/pkg/pubsub"
+
+type Broadcaster interface {
+ GetDriver(key string) (pubsub.SubReader, error)
+}
diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go
index c43b2e4c..612b6a47 100644
--- a/plugins/broadcast/plugin.go
+++ b/plugins/broadcast/plugin.go
@@ -6,10 +6,10 @@ import (
endure "github.com/spiral/endure/pkg/container"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
@@ -30,8 +30,8 @@ type Plugin struct {
log logger.Logger
// publishers implement Publisher interface
// and able to receive a payload
- publishers map[string]pubsub.PubSub
- providers map[string]pubsub.PSProvider
+ publishers map[string]pubsub.PubSub
+ constructors map[string]pubsub.Constructor
}
func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
@@ -47,7 +47,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
}
p.publishers = make(map[string]pubsub.PubSub)
- p.providers = make(map[string]pubsub.PSProvider)
+ p.constructors = make(map[string]pubsub.Constructor)
p.log = log
p.cfgPlugin = cfg
@@ -64,6 +64,8 @@ func (p *Plugin) Serve() chan error {
continue
}
+ // check type of the v
+ // should be a map[string]interface{}
switch t := v.(type) {
// correct type
case map[string]interface{}:
@@ -81,11 +83,11 @@ func (p *Plugin) Serve() chan error {
switch v.(map[string]interface{})[driver] {
case memory:
- if _, ok := p.providers[memory]; !ok {
+ if _, ok := p.constructors[memory]; !ok {
p.log.Warn("no memory drivers registered", "registered", p.publishers)
continue
}
- ps, err := p.providers[memory].PSProvide(configKey)
+ ps, err := p.constructors[memory].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -94,7 +96,7 @@ func (p *Plugin) Serve() chan error {
// save the pubsub
p.publishers[k] = ps
case redis:
- if _, ok := p.providers[redis]; !ok {
+ if _, ok := p.constructors[redis]; !ok {
p.log.Warn("no redis drivers registered", "registered", p.publishers)
continue
}
@@ -102,7 +104,7 @@ func (p *Plugin) Serve() chan error {
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
- ps, err := p.providers[redis].PSProvide(configKey)
+ ps, err := p.constructors[redis].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -111,7 +113,7 @@ func (p *Plugin) Serve() chan error {
// save the pubsub
p.publishers[k] = ps
case p.cfgPlugin.Has(redis):
- ps, err := p.providers[redis].PSProvide(configKey)
+ ps, err := p.constructors[redis].PSConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -138,9 +140,9 @@ func (p *Plugin) Collects() []interface{} {
}
// CollectPublishers collect all plugins who implement pubsub.Publisher interface
-func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) {
+func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) {
// key redis, value - interface
- p.providers[name.Name()] = subscriber
+ p.constructors[name.Name()] = subscriber
}
// Publish is an entry point to the websocket PUBSUB
@@ -150,7 +152,7 @@ func (p *Plugin) Publish(m []byte) error {
const op = errors.Op("broadcast_plugin_publish")
- msg := &websocketsv1.Message{}
+ msg := &websocketsv1beta.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
return errors.E(op, err)
@@ -179,7 +181,7 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- msg := &websocketsv1.Message{}
+ msg := &websocketsv1beta.Message{}
err := proto.Unmarshal(m, msg)
if err != nil {
p.log.Error("message unmarshal")
@@ -201,7 +203,7 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) {
const op = errors.Op("broadcast_plugin_get_driver")
// key - driver, default for example
- // we should find `default` in the collected pubsubs providers
+ // we should find `default` in the collected pubsubs constructors
if pub, ok := p.publishers[key]; ok {
return pub, nil
}
diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go
index fa853421..4c27cdc3 100644
--- a/plugins/broadcast/rpc.go
+++ b/plugins/broadcast/rpc.go
@@ -2,8 +2,8 @@ package broadcast
import (
"github.com/spiral/errors"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
@@ -24,8 +24,7 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro
return nil
}
- r.log.Debug("message published", "msg", in.Messages)
-
+ r.log.Debug("message published", "msg", in.String())
msgLen := len(in.GetMessages())
for i := 0; i < msgLen; i++ {
@@ -56,7 +55,7 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response)
return nil
}
- r.log.Debug("message published", "msg", in.Messages)
+ r.log.Debug("message published", "msg", in.GetMessages())
msgLen := len(in.GetMessages())
diff --git a/plugins/kv/config.go b/plugins/kv/config.go
index 66095817..09ba79cd 100644
--- a/plugins/kv/config.go
+++ b/plugins/kv/config.go
@@ -1,6 +1,6 @@
package kv
-// Config represents general storage configuration with keys as the user defined kv-names and values as the drivers
+// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors
type Config struct {
Data map[string]interface{} `mapstructure:"kv"`
}
diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go
index 5f4d98b1..4b675271 100644
--- a/plugins/kv/drivers/boltdb/driver.go
+++ b/plugins/kv/drivers/boltdb/driver.go
@@ -9,10 +9,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
bolt "go.etcd.io/bbolt"
)
diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go
index 28e2a89c..6ae1a1f6 100644
--- a/plugins/kv/drivers/boltdb/plugin.go
+++ b/plugins/kv/drivers/boltdb/plugin.go
@@ -46,7 +46,7 @@ func (s *Plugin) Stop() error {
return nil
}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop)
if err != nil {
diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go
index c1f79cbb..a2787d72 100644
--- a/plugins/kv/drivers/memcached/driver.go
+++ b/plugins/kv/drivers/memcached/driver.go
@@ -6,10 +6,10 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go
index 936b2047..22ea5cca 100644
--- a/plugins/kv/drivers/memcached/plugin.go
+++ b/plugins/kv/drivers/memcached/plugin.go
@@ -34,7 +34,7 @@ func (s *Plugin) Name() string {
// Available interface implementation
func (s *Plugin) Available() {}
-func (s *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (s *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("boltdb_plugin_provide")
st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin)
if err != nil {
diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go
index fd906041..ffdbbe62 100644
--- a/plugins/kv/interface.go
+++ b/plugins/kv/interface.go
@@ -1,6 +1,6 @@
package kv
-import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
+import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
// Storage represents single abstract storage.
type Storage interface {
@@ -29,13 +29,8 @@ type Storage interface {
Delete(keys ...string) error
}
-// StorageDriver interface provide storage
-type StorageDriver interface {
- Provider
-}
-
-// Provider provides storage based on the config
-type Provider interface {
- // KVProvide provides Storage based on the config key
- KVProvide(key string) (Storage, error)
+// Constructor provides storage based on the config
+type Constructor interface {
+ // KVConstruct provides Storage based on the config key
+ KVConstruct(key string) (Storage, error)
}
diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go
index 716e0d4c..03dbaed6 100644
--- a/plugins/kv/plugin.go
+++ b/plugins/kv/plugin.go
@@ -24,8 +24,8 @@ const (
// Plugin for the unified storage
type Plugin struct {
log logger.Logger
- // drivers contains general storage drivers, such as boltdb, memory, memcached, redis.
- drivers map[string]StorageDriver
+ // constructors contains general storage constructors, such as boltdb, memory, memcached, redis.
+ constructors map[string]Constructor
// storages contains user-defined storages, such as boltdb-north, memcached-us and so on.
storages map[string]Storage
// KV configuration
@@ -43,7 +43,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error {
if err != nil {
return errors.E(op, err)
}
- p.drivers = make(map[string]StorageDriver, 5)
+ p.constructors = make(map[string]Constructor, 5)
p.storages = make(map[string]Storage, 5)
p.log = log
p.cfgPlugin = cfg
@@ -81,7 +81,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
addr: [ "localhost:11211" ]
- For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
+ For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached
when user requests for example boltdb-south, we should provide that particular preconfigured storage
*/
for k, v := range p.cfg.Data {
@@ -90,9 +90,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
continue
}
- if _, ok := v.(map[string]interface{})[driver]; !ok {
- errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
- return errCh
+ // check type of the v
+ // should be a map[string]interface{}
+ switch t := v.(type) {
+ // correct type
+ case map[string]interface{}:
+ if _, ok := t[driver]; !ok {
+ errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k))
+ return errCh
+ }
+ default:
+ p.log.Warn("wrong type detected in the configuration, please, check yaml indentation")
+ continue
}
// config key for the particular sub-driver kv.memcached
@@ -100,12 +109,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// at this point we know, that driver field present in the configuration
switch v.(map[string]interface{})[driver] {
case memcached:
- if _, ok := p.drivers[memcached]; !ok {
- p.log.Warn("no memcached drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memcached]; !ok {
+ p.log.Warn("no memcached constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memcached].KVProvide(configKey)
+ storage, err := p.constructors[memcached].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -115,12 +124,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
p.storages[k] = storage
case boltdb:
- if _, ok := p.drivers[boltdb]; !ok {
- p.log.Warn("no boltdb drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[boltdb]; !ok {
+ p.log.Warn("no boltdb constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[boltdb].KVProvide(configKey)
+ storage, err := p.constructors[boltdb].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -129,12 +138,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case memory:
- if _, ok := p.drivers[memory]; !ok {
- p.log.Warn("no in-memory drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[memory]; !ok {
+ p.log.Warn("no in-memory constructors registered", "registered", p.constructors)
continue
}
- storage, err := p.drivers[memory].KVProvide(configKey)
+ storage, err := p.constructors[memory].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -143,15 +152,15 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case redis:
- if _, ok := p.drivers[redis]; !ok {
- p.log.Warn("no redis drivers registered", "registered", p.drivers)
+ if _, ok := p.constructors[redis]; !ok {
+ p.log.Warn("no redis constructors registered", "registered", p.constructors)
continue
}
// first - try local configuration
switch {
case p.cfgPlugin.Has(configKey):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -160,7 +169,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit
// save the storage
p.storages[k] = storage
case p.cfgPlugin.Has(redis):
- storage, err := p.drivers[redis].KVProvide(configKey)
+ storage, err := p.constructors[redis].KVConstruct(configKey)
if err != nil {
errCh <- errors.E(op, err)
return errCh
@@ -194,9 +203,9 @@ func (p *Plugin) Collects() []interface{} {
}
}
-func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) {
- // save the storage driver
- p.drivers[name.Name()] = storage
+func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) {
+ // save the storage constructor
+ p.constructors[name.Name()] = constructor
}
// RPC returns associated rpc service.
diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go
index ab1f7f31..af763600 100644
--- a/plugins/kv/rpc.go
+++ b/plugins/kv/rpc.go
@@ -2,8 +2,8 @@ package kv
import (
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
// Wrapper for the plugin
diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go
index 9b7d7259..1cf031d1 100644
--- a/plugins/memory/kv.go
+++ b/plugins/memory/kv.go
@@ -6,10 +6,10 @@ import (
"time"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
)
type Driver struct {
diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go
index d4d535bf..70badf15 100644
--- a/plugins/memory/plugin.go
+++ b/plugins/memory/plugin.go
@@ -2,7 +2,7 @@ package memory
import (
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -41,11 +41,11 @@ func (p *Plugin) Stop() error {
return nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key)
}
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("inmemory_plugin_provide")
st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop)
if err != nil {
diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go
index 02246a8f..87638bd8 100644
--- a/plugins/memory/pubsub.go
+++ b/plugins/memory/pubsub.go
@@ -4,9 +4,9 @@ import (
"sync"
"github.com/spiral/roadrunner/v2/pkg/bst"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go
index ac9ebcc2..0bdd4cf5 100644
--- a/plugins/redis/fanin.go
+++ b/plugins/redis/fanin.go
@@ -4,8 +4,8 @@ import (
"context"
"sync"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
"github.com/go-redis/redis/v8"
diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go
index 66cb8384..320b7443 100644
--- a/plugins/redis/kv.go
+++ b/plugins/redis/kv.go
@@ -7,10 +7,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta"
"github.com/spiral/roadrunner/v2/utils"
)
diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go
index 8d997041..9d98790b 100644
--- a/plugins/redis/plugin.go
+++ b/plugins/redis/plugin.go
@@ -5,7 +5,7 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/kv"
"github.com/spiral/roadrunner/v2/plugins/logger"
@@ -59,8 +59,8 @@ func (p *Plugin) Name() string {
// Available interface implementation
func (p *Plugin) Available() {}
-// KVProvide provides KV storage implementation over the redis plugin
-func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
+// KVConstruct provides KV storage implementation over the redis plugin
+func (p *Plugin) KVConstruct(key string) (kv.Storage, error) {
const op = errors.Op("redis_plugin_provide")
st, err := NewRedisDriver(p.log, key, p.cfgPlugin)
if err != nil {
@@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) {
return st, nil
}
-func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) {
+func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) {
return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh)
}
diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go
index c2a88abe..dc391c20 100644
--- a/plugins/redis/pubsub.go
+++ b/plugins/redis/pubsub.go
@@ -6,10 +6,10 @@ import (
"github.com/go-redis/redis/v8"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/logger"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"google.golang.org/protobuf/proto"
)
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 799312ad..0583be0c 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -7,12 +7,12 @@ import (
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
)
type Response struct {
diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go
index ec6e1960..bbc49bbb 100644
--- a/plugins/websockets/origin_test.go
+++ b/plugins/websockets/origin_test.go
@@ -9,6 +9,7 @@ import (
func TestConfig_Origin(t *testing.T) {
cfg := &Config{
AllowedOrigin: "*",
+ Broker: "any",
}
err := cfg.InitDefault()
@@ -28,6 +29,7 @@ func TestConfig_Origin(t *testing.T) {
func TestConfig_OriginWildCard(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://*my.site.com",
+ Broker: "any",
}
err := cfg.InitDefault()
@@ -50,6 +52,7 @@ func TestConfig_OriginWildCard(t *testing.T) {
func TestConfig_OriginWildCard2(t *testing.T) {
cfg := &Config{
AllowedOrigin: "https://my.*.com",
+ Broker: "any",
}
err := cfg.InitDefault()
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index de7443fd..f0b7c6c3 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -10,12 +10,12 @@ import (
"github.com/google/uuid"
json "github.com/json-iterator/go"
"github.com/spiral/errors"
- "github.com/spiral/roadrunner/v2/pkg/interface/broadcast"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/worker"
+ "github.com/spiral/roadrunner/v2/plugins/broadcast"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
"github.com/spiral/roadrunner/v2/plugins/logger"
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index cd9444da..3d95ede0 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,10 @@ import (
"sync"
json "github.com/json-iterator/go"
- "github.com/spiral/roadrunner/v2/pkg/interface/pubsub"
- websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
+ websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -105,10 +105,10 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
// res is a map with a connectionsID
- for topic := range res {
- c, ok := wp.connections.Load(topic)
+ for connID := range res {
+ c, ok := wp.connections.Load(connID)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i])
+ wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.GetTopics()[i])
wp.put(res)
continue
}