diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 90 |
1 files changed, 37 insertions, 53 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index a196d1f0..752ba3ce 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,7 +4,6 @@ import ( "sync" json "github.com/json-iterator/go" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" @@ -12,21 +11,21 @@ import ( ) type WorkersPool struct { - storage map[string]pubsub.PubSub + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger - queue chan *websocketsv1.Message + queue chan *pubsub.Message exit chan struct{} } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + queue: make(chan *pubsub.Message, 100), + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -43,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { +func (wp *WorkersPool) Queue(msg *pubsub.Message) { wp.queue <- msg } @@ -83,63 +82,48 @@ func (wp *WorkersPool) do() { //nolint:gocognit return } _ = msg - if msg == nil { - continue - } - if len(msg.GetTopics()) == 0 { + if msg == nil || msg.Topic == "" { continue } - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) + // get free map + res := wp.get() + + // get connections for the particular topic + wp.subscriber.Connections(msg.Topic, res) + + if len(res) == 0 { + wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) + wp.put(res) continue } - // send a message to every topic - for i := 0; i < len(msg.GetTopics()); i++ { - // get free map - res := wp.get() + // res is a map with a connectionsID + for connID := range res { + c, ok := wp.connections.Load(connID) + if !ok { + wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic) + wp.put(res) + continue + } - // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) + d, err := json.Marshal(&Response{ + Topic: msg.Topic, + Payload: utils.AsString(msg.Payload), + }) - if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) + if err != nil { + wp.log.Error("error marshaling response", "error", err) wp.put(res) - continue + break } - // res is a map with a connectionsID - for topic := range res { - c, ok := wp.connections.Load(topic) - if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - wp.put(res) - continue - } - - response := &Response{ - Topic: msg.GetTopics()[i], - Payload: utils.AsString(msg.GetPayload()), - } - - d, err := json.Marshal(response) - if err != nil { - wp.log.Error("error marshaling response", "error", err) - wp.put(res) - break - } - - // put data into the bytes buffer - err = c.(*connection.Connection).Write(d) - if err != nil { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - } - wp.put(res) - continue - } + // put data into the bytes buffer + err = c.(*connection.Connection).Write(d) + if err != nil { + wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic) + wp.put(res) + continue } } case <-wp.exit: |