diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 01:06:16 +0300 |
commit | fe7bb0fe758d573fe353df028257ed66c6eccf66 (patch) | |
tree | 74392f8e61e96c85f0d8b684cfc08e3fc3664ae9 /plugins/websockets/pool | |
parent | 68ff941c4226074206ceed9c30bd95317aa0e9fc (diff) |
- Rework main parts
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/pool')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 20 |
1 files changed, 7 insertions, 13 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 22042d8d..cd9444da 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,15 +4,15 @@ import ( "sync" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage map[string]pubsub.SubReader + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger @@ -22,11 +22,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -90,19 +90,13 @@ func (wp *WorkersPool) do() { //nolint:gocognit continue } - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) - continue - } - // send a message to every topic for i := 0; i < len(msg.GetTopics()); i++ { // get free map res := wp.get() // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) + wp.subscriber.Connections(msg.GetTopics()[i], res) if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) @@ -114,7 +108,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit for topic := range res { c, ok := wp.connections.Load(topic) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i]) wp.put(res) continue } @@ -135,7 +129,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit err = c.(*connection.Connection).Write(d) if err != nil { for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i]) } wp.put(res) continue |