From fe7bb0fe758d573fe353df028257ed66c6eccf66 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Fri, 18 Jun 2021 01:06:16 +0300 Subject: - Rework main parts Signed-off-by: Valery Piashchynski --- plugins/websockets/pool/workers_pool.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) (limited to 'plugins/websockets/pool') diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 22042d8d..cd9444da 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,15 +4,15 @@ import ( "sync" json "github.com/json-iterator/go" + "github.com/spiral/roadrunner/v2/pkg/interface/pubsub" websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" - "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage map[string]pubsub.SubReader + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger @@ -22,11 +22,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.SubReader, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -90,19 +90,13 @@ func (wp *WorkersPool) do() { //nolint:gocognit continue } - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) - continue - } - // send a message to every topic for i := 0; i < len(msg.GetTopics()); i++ { // get free map res := wp.get() // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) + wp.subscriber.Connections(msg.GetTopics()[i], res) if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) @@ -114,7 +108,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit for topic := range res { c, ok := wp.connections.Load(topic) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Warn("the user disconnected connection before the message being written to it", "topics", msg.GetTopics()[i]) wp.put(res) continue } @@ -135,7 +129,7 @@ func (wp *WorkersPool) do() { //nolint:gocognit err = c.(*connection.Connection).Write(d) if err != nil { for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) + wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i]) } wp.put(res) continue -- cgit v1.2.3