diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 12:00:05 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 12:00:05 +0300 |
commit | 9e8bad3988c1fec2e545898d529446f7b93e537b (patch) | |
tree | d91159b8c78c8add1981641499ef81c821d5d363 /plugins | |
parent | fe7bb0fe758d573fe353df028257ed66c6eccf66 (diff) |
- Rework finished
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins')
-rw-r--r-- | plugins/broadcast/interface.go | 7 | ||||
-rw-r--r-- | plugins/broadcast/plugin.go | 32 | ||||
-rw-r--r-- | plugins/broadcast/rpc.go | 7 | ||||
-rw-r--r-- | plugins/kv/config.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/driver.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/boltdb/plugin.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/driver.go | 2 | ||||
-rw-r--r-- | plugins/kv/drivers/memcached/plugin.go | 2 | ||||
-rw-r--r-- | plugins/kv/interface.go | 15 | ||||
-rw-r--r-- | plugins/kv/plugin.go | 55 | ||||
-rw-r--r-- | plugins/kv/rpc.go | 2 | ||||
-rw-r--r-- | plugins/memory/kv.go | 2 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 6 | ||||
-rw-r--r-- | plugins/memory/pubsub.go | 4 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 2 | ||||
-rw-r--r-- | plugins/redis/kv.go | 2 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 8 | ||||
-rw-r--r-- | plugins/redis/pubsub.go | 4 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 4 | ||||
-rw-r--r-- | plugins/websockets/origin_test.go | 3 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 4 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 10 |
22 files changed, 96 insertions, 81 deletions
diff --git a/plugins/broadcast/interface.go b/plugins/broadcast/interface.go new file mode 100644 index 00000000..46709d71 --- /dev/null +++ b/plugins/broadcast/interface.go @@ -0,0 +1,7 @@ +package broadcast + +import "github.com/spiral/roadrunner/v2/pkg/pubsub" + +type Broadcaster interface { + GetDriver(key string) (pubsub.SubReader, error) +} diff --git a/plugins/broadcast/plugin.go b/plugins/broadcast/plugin.go index c43b2e4c..612b6a47 100644 --- a/plugins/broadcast/plugin.go +++ b/plugins/broadcast/plugin.go @@ -6,10 +6,10 @@ import ( endure "github.com/spiral/endure/pkg/container" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1beta "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) @@ -30,8 +30,8 @@ type Plugin struct { log logger.Logger // publishers implement Publisher interface // and able to receive a payload - publishers map[string]pubsub.PubSub - providers map[string]pubsub.PSProvider + publishers map[string]pubsub.PubSub + constructors map[string]pubsub.Constructor } func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { @@ -47,7 +47,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { } p.publishers = make(map[string]pubsub.PubSub) - p.providers = make(map[string]pubsub.PSProvider) + p.constructors = make(map[string]pubsub.Constructor) p.log = log p.cfgPlugin = cfg @@ -64,6 +64,8 @@ func (p *Plugin) Serve() chan error { continue } + // check type of the v + // should be a map[string]interface{} switch t := v.(type) { // correct type case map[string]interface{}: @@ -81,11 +83,11 @@ func (p *Plugin) Serve() chan error { switch v.(map[string]interface{})[driver] { case memory: - if _, ok := p.providers[memory]; !ok { + if _, ok := p.constructors[memory]; !ok { p.log.Warn("no memory drivers registered", "registered", p.publishers) continue } - ps, err := p.providers[memory].PSProvide(configKey) + ps, err := p.constructors[memory].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -94,7 +96,7 @@ func (p *Plugin) Serve() chan error { // save the pubsub p.publishers[k] = ps case redis: - if _, ok := p.providers[redis]; !ok { + if _, ok := p.constructors[redis]; !ok { p.log.Warn("no redis drivers registered", "registered", p.publishers) continue } @@ -102,7 +104,7 @@ func (p *Plugin) Serve() chan error { // first - try local configuration switch { case p.cfgPlugin.Has(configKey): - ps, err := p.providers[redis].PSProvide(configKey) + ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -111,7 +113,7 @@ func (p *Plugin) Serve() chan error { // save the pubsub p.publishers[k] = ps case p.cfgPlugin.Has(redis): - ps, err := p.providers[redis].PSProvide(configKey) + ps, err := p.constructors[redis].PSConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -138,9 +140,9 @@ func (p *Plugin) Collects() []interface{} { } // CollectPublishers collect all plugins who implement pubsub.Publisher interface -func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.PSProvider) { +func (p *Plugin) CollectPublishers(name endure.Named, subscriber pubsub.Constructor) { // key redis, value - interface - p.providers[name.Name()] = subscriber + p.constructors[name.Name()] = subscriber } // Publish is an entry point to the websocket PUBSUB @@ -150,7 +152,7 @@ func (p *Plugin) Publish(m []byte) error { const op = errors.Op("broadcast_plugin_publish") - msg := &websocketsv1.Message{} + msg := &websocketsv1beta.Message{} err := proto.Unmarshal(m, msg) if err != nil { return errors.E(op, err) @@ -179,7 +181,7 @@ func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - msg := &websocketsv1.Message{} + msg := &websocketsv1beta.Message{} err := proto.Unmarshal(m, msg) if err != nil { p.log.Error("message unmarshal") @@ -201,7 +203,7 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) GetDriver(key string) (pubsub.SubReader, error) { const op = errors.Op("broadcast_plugin_get_driver") // key - driver, default for example - // we should find `default` in the collected pubsubs providers + // we should find `default` in the collected pubsubs constructors if pub, ok := p.publishers[key]; ok { return pub, nil } diff --git a/plugins/broadcast/rpc.go b/plugins/broadcast/rpc.go index fa853421..4c27cdc3 100644 --- a/plugins/broadcast/rpc.go +++ b/plugins/broadcast/rpc.go @@ -2,8 +2,8 @@ package broadcast import ( "github.com/spiral/errors" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) @@ -24,8 +24,7 @@ func (r *rpc) Publish(in *websocketsv1.Request, out *websocketsv1.Response) erro return nil } - r.log.Debug("message published", "msg", in.Messages) - + r.log.Debug("message published", "msg", in.String()) msgLen := len(in.GetMessages()) for i := 0; i < msgLen; i++ { @@ -56,7 +55,7 @@ func (r *rpc) PublishAsync(in *websocketsv1.Request, out *websocketsv1.Response) return nil } - r.log.Debug("message published", "msg", in.Messages) + r.log.Debug("message published", "msg", in.GetMessages()) msgLen := len(in.GetMessages()) diff --git a/plugins/kv/config.go b/plugins/kv/config.go index 66095817..09ba79cd 100644 --- a/plugins/kv/config.go +++ b/plugins/kv/config.go @@ -1,6 +1,6 @@ package kv -// Config represents general storage configuration with keys as the user defined kv-names and values as the drivers +// Config represents general storage configuration with keys as the user defined kv-names and values as the constructors type Config struct { Data map[string]interface{} `mapstructure:"kv"` } diff --git a/plugins/kv/drivers/boltdb/driver.go b/plugins/kv/drivers/boltdb/driver.go index 5f4d98b1..4b675271 100644 --- a/plugins/kv/drivers/boltdb/driver.go +++ b/plugins/kv/drivers/boltdb/driver.go @@ -9,10 +9,10 @@ import ( "time" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" bolt "go.etcd.io/bbolt" ) diff --git a/plugins/kv/drivers/boltdb/plugin.go b/plugins/kv/drivers/boltdb/plugin.go index 28e2a89c..6ae1a1f6 100644 --- a/plugins/kv/drivers/boltdb/plugin.go +++ b/plugins/kv/drivers/boltdb/plugin.go @@ -46,7 +46,7 @@ func (s *Plugin) Stop() error { return nil } -func (s *Plugin) KVProvide(key string) (kv.Storage, error) { +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewBoltDBDriver(s.log, key, s.cfgPlugin, s.stop) if err != nil { diff --git a/plugins/kv/drivers/memcached/driver.go b/plugins/kv/drivers/memcached/driver.go index c1f79cbb..a2787d72 100644 --- a/plugins/kv/drivers/memcached/driver.go +++ b/plugins/kv/drivers/memcached/driver.go @@ -6,10 +6,10 @@ import ( "github.com/bradfitz/gomemcache/memcache" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) type Driver struct { diff --git a/plugins/kv/drivers/memcached/plugin.go b/plugins/kv/drivers/memcached/plugin.go index 936b2047..22ea5cca 100644 --- a/plugins/kv/drivers/memcached/plugin.go +++ b/plugins/kv/drivers/memcached/plugin.go @@ -34,7 +34,7 @@ func (s *Plugin) Name() string { // Available interface implementation func (s *Plugin) Available() {} -func (s *Plugin) KVProvide(key string) (kv.Storage, error) { +func (s *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("boltdb_plugin_provide") st, err := NewMemcachedDriver(s.log, key, s.cfgPlugin) if err != nil { diff --git a/plugins/kv/interface.go b/plugins/kv/interface.go index fd906041..ffdbbe62 100644 --- a/plugins/kv/interface.go +++ b/plugins/kv/interface.go @@ -1,6 +1,6 @@ package kv -import kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" +import kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" // Storage represents single abstract storage. type Storage interface { @@ -29,13 +29,8 @@ type Storage interface { Delete(keys ...string) error } -// StorageDriver interface provide storage -type StorageDriver interface { - Provider -} - -// Provider provides storage based on the config -type Provider interface { - // KVProvide provides Storage based on the config key - KVProvide(key string) (Storage, error) +// Constructor provides storage based on the config +type Constructor interface { + // KVConstruct provides Storage based on the config key + KVConstruct(key string) (Storage, error) } diff --git a/plugins/kv/plugin.go b/plugins/kv/plugin.go index 716e0d4c..03dbaed6 100644 --- a/plugins/kv/plugin.go +++ b/plugins/kv/plugin.go @@ -24,8 +24,8 @@ const ( // Plugin for the unified storage type Plugin struct { log logger.Logger - // drivers contains general storage drivers, such as boltdb, memory, memcached, redis. - drivers map[string]StorageDriver + // constructors contains general storage constructors, such as boltdb, memory, memcached, redis. + constructors map[string]Constructor // storages contains user-defined storages, such as boltdb-north, memcached-us and so on. storages map[string]Storage // KV configuration @@ -43,7 +43,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { if err != nil { return errors.E(op, err) } - p.drivers = make(map[string]StorageDriver, 5) + p.constructors = make(map[string]Constructor, 5) p.storages = make(map[string]Storage, 5) p.log = log p.cfgPlugin = cfg @@ -81,7 +81,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit addr: [ "localhost:11211" ] - For this config we should have 3 drivers: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached + For this config we should have 3 constructors: memory, boltdb and memcached but 4 KVs: default, boltdb-south, boltdb-north and memcached when user requests for example boltdb-south, we should provide that particular preconfigured storage */ for k, v := range p.cfg.Data { @@ -90,9 +90,18 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit continue } - if _, ok := v.(map[string]interface{})[driver]; !ok { - errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) - return errCh + // check type of the v + // should be a map[string]interface{} + switch t := v.(type) { + // correct type + case map[string]interface{}: + if _, ok := t[driver]; !ok { + errCh <- errors.E(op, errors.Errorf("could not find mandatory driver field in the %s storage", k)) + return errCh + } + default: + p.log.Warn("wrong type detected in the configuration, please, check yaml indentation") + continue } // config key for the particular sub-driver kv.memcached @@ -100,12 +109,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // at this point we know, that driver field present in the configuration switch v.(map[string]interface{})[driver] { case memcached: - if _, ok := p.drivers[memcached]; !ok { - p.log.Warn("no memcached drivers registered", "registered", p.drivers) + if _, ok := p.constructors[memcached]; !ok { + p.log.Warn("no memcached constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[memcached].KVProvide(configKey) + storage, err := p.constructors[memcached].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -115,12 +124,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit p.storages[k] = storage case boltdb: - if _, ok := p.drivers[boltdb]; !ok { - p.log.Warn("no boltdb drivers registered", "registered", p.drivers) + if _, ok := p.constructors[boltdb]; !ok { + p.log.Warn("no boltdb constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[boltdb].KVProvide(configKey) + storage, err := p.constructors[boltdb].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -129,12 +138,12 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case memory: - if _, ok := p.drivers[memory]; !ok { - p.log.Warn("no in-memory drivers registered", "registered", p.drivers) + if _, ok := p.constructors[memory]; !ok { + p.log.Warn("no in-memory constructors registered", "registered", p.constructors) continue } - storage, err := p.drivers[memory].KVProvide(configKey) + storage, err := p.constructors[memory].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -143,15 +152,15 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case redis: - if _, ok := p.drivers[redis]; !ok { - p.log.Warn("no redis drivers registered", "registered", p.drivers) + if _, ok := p.constructors[redis]; !ok { + p.log.Warn("no redis constructors registered", "registered", p.constructors) continue } // first - try local configuration switch { case p.cfgPlugin.Has(configKey): - storage, err := p.drivers[redis].KVProvide(configKey) + storage, err := p.constructors[redis].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -160,7 +169,7 @@ func (p *Plugin) Serve() chan error { //nolint:gocognit // save the storage p.storages[k] = storage case p.cfgPlugin.Has(redis): - storage, err := p.drivers[redis].KVProvide(configKey) + storage, err := p.constructors[redis].KVConstruct(configKey) if err != nil { errCh <- errors.E(op, err) return errCh @@ -194,9 +203,9 @@ func (p *Plugin) Collects() []interface{} { } } -func (p *Plugin) GetAllStorageDrivers(name endure.Named, storage StorageDriver) { - // save the storage driver - p.drivers[name.Name()] = storage +func (p *Plugin) GetAllStorageDrivers(name endure.Named, constructor Constructor) { + // save the storage constructor + p.constructors[name.Name()] = constructor } // RPC returns associated rpc service. diff --git a/plugins/kv/rpc.go b/plugins/kv/rpc.go index ab1f7f31..af763600 100644 --- a/plugins/kv/rpc.go +++ b/plugins/kv/rpc.go @@ -2,8 +2,8 @@ package kv import ( "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) // Wrapper for the plugin diff --git a/plugins/memory/kv.go b/plugins/memory/kv.go index 9b7d7259..1cf031d1 100644 --- a/plugins/memory/kv.go +++ b/plugins/memory/kv.go @@ -6,10 +6,10 @@ import ( "time" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" ) type Driver struct { diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index d4d535bf..70badf15 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -2,7 +2,7 @@ package memory import ( "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -41,11 +41,11 @@ func (p *Plugin) Stop() error { return nil } -func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { +func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { return NewPubSubDriver(p.log, key) } -func (p *Plugin) KVProvide(key string) (kv.Storage, error) { +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("inmemory_plugin_provide") st, err := NewInMemoryDriver(p.log, key, p.cfgPlugin, p.stop) if err != nil { diff --git a/plugins/memory/pubsub.go b/plugins/memory/pubsub.go index 02246a8f..87638bd8 100644 --- a/plugins/memory/pubsub.go +++ b/plugins/memory/pubsub.go @@ -4,9 +4,9 @@ import ( "sync" "github.com/spiral/roadrunner/v2/pkg/bst" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index ac9ebcc2..0bdd4cf5 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -4,8 +4,8 @@ import ( "context" "sync" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" "github.com/go-redis/redis/v8" diff --git a/plugins/redis/kv.go b/plugins/redis/kv.go index 66cb8384..320b7443 100644 --- a/plugins/redis/kv.go +++ b/plugins/redis/kv.go @@ -7,10 +7,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - kvv1 "github.com/spiral/roadrunner/v2/pkg/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" + kvv1 "github.com/spiral/roadrunner/v2/proto/kv/v1beta" "github.com/spiral/roadrunner/v2/utils" ) diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 8d997041..9d98790b 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -5,7 +5,7 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/kv" "github.com/spiral/roadrunner/v2/plugins/logger" @@ -59,8 +59,8 @@ func (p *Plugin) Name() string { // Available interface implementation func (p *Plugin) Available() {} -// KVProvide provides KV storage implementation over the redis plugin -func (p *Plugin) KVProvide(key string) (kv.Storage, error) { +// KVConstruct provides KV storage implementation over the redis plugin +func (p *Plugin) KVConstruct(key string) (kv.Storage, error) { const op = errors.Op("redis_plugin_provide") st, err := NewRedisDriver(p.log, key, p.cfgPlugin) if err != nil { @@ -70,6 +70,6 @@ func (p *Plugin) KVProvide(key string) (kv.Storage, error) { return st, nil } -func (p *Plugin) PSProvide(key string) (pubsub.PubSub, error) { +func (p *Plugin) PSConstruct(key string) (pubsub.PubSub, error) { return NewPubSubDriver(p.log, key, p.cfgPlugin, p.stopCh) } diff --git a/plugins/redis/pubsub.go b/plugins/redis/pubsub.go index c2a88abe..dc391c20 100644 --- a/plugins/redis/pubsub.go +++ b/plugins/redis/pubsub.go @@ -6,10 +6,10 @@ import ( "github.com/go-redis/redis/v8" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/logger" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "google.golang.org/protobuf/proto" ) diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 799312ad..0583be0c 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -7,12 +7,12 @@ import ( json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" ) type Response struct { diff --git a/plugins/websockets/origin_test.go b/plugins/websockets/origin_test.go index ec6e1960..bbc49bbb 100644 --- a/plugins/websockets/origin_test.go +++ b/plugins/websockets/origin_test.go @@ -9,6 +9,7 @@ import ( func TestConfig_Origin(t *testing.T) { cfg := &Config{ AllowedOrigin: "*", + Broker: "any", } err := cfg.InitDefault() @@ -28,6 +29,7 @@ func TestConfig_Origin(t *testing.T) { func TestConfig_OriginWildCard(t *testing.T) { cfg := &Config{ AllowedOrigin: "https://*my.site.com", + Broker: "any", } err := cfg.InitDefault() @@ -50,6 +52,7 @@ func TestConfig_OriginWildCard(t *testing.T) { func TestConfig_OriginWildCard2(t *testing.T) { cfg := &Config{ AllowedOrigin: "https://my.*.com", + Broker: "any", } err := cfg.InitDefault() diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index de7443fd..f0b7c6c3 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -10,12 +10,12 @@ import ( "github.com/google/uuid" json "github.com/json-iterator/go" "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/pkg/interface/broadcast" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" "github.com/spiral/roadrunner/v2/pkg/payload" phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/worker" + "github.com/spiral/roadrunner/v2/plugins/broadcast" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" "github.com/spiral/roadrunner/v2/plugins/logger" diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index cd9444da..3d95ede0 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,10 +4,10 @@ import ( "sync" json "github.com/json-iterator/go" - "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/utils" ) @@ -105,10 +105,10 @@ func (wp *WorkersPool) do() { //nolint:gocognit } // res is a map with a connectionsID - for topic := range res { - c, ok := wp.connections.Load(topic) + for connID := range res { + c, ok := wp.connections.Load(connID) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i]) + wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.GetTopics()[i]) wp.put(res) continue } |