diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 75 |
1 files changed, 37 insertions, 38 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 1a7c6f8a..752ba3ce 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -3,29 +3,29 @@ package pool import ( "sync" - "github.com/fasthttp/websocket" - websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta" + json "github.com/json-iterator/go" "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage map[string]pubsub.PubSub + subscriber pubsub.Subscriber connections *sync.Map resPool sync.Pool log logger.Logger - queue chan *websocketsv1.Message + queue chan *pubsub.Message exit chan struct{} } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *websocketsv1.Message, 100), - storage: pubsubs, + queue: make(chan *pubsub.Message, 100), + subscriber: subscriber, log: log, exit: make(chan struct{}), } @@ -42,7 +42,7 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log return wp } -func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { +func (wp *WorkersPool) Queue(msg *pubsub.Message) { wp.queue <- msg } @@ -67,6 +67,12 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } +// Response from the server +type Response struct { + Topic string `json:"topic"` + Payload string `json:"payload"` +} + func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -76,57 +82,50 @@ func (wp *WorkersPool) do() { //nolint:gocognit return } _ = msg - if msg == nil { - continue - } - if len(msg.GetTopics()) == 0 { - continue - } - - br, ok := wp.storage[msg.Broker] - if !ok { - wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) + if msg == nil || msg.Topic == "" { continue } + // get free map res := wp.get() - for i := 0; i < len(msg.GetTopics()); i++ { - // get connections for the particular topic - br.Connections(msg.GetTopics()[i], res) - } + // get connections for the particular topic + wp.subscriber.Connections(msg.Topic, res) if len(res) == 0 { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) - } + wp.log.Info("no connections associated with provided topic", "topic", msg.Topic) wp.put(res) continue } - for i := range res { - c, ok := wp.connections.Load(i) + // res is a map with a connectionsID + for connID := range res { + c, ok := wp.connections.Load(connID) if !ok { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - } + wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic) + wp.put(res) continue } - conn := c.(*connection.Connection) + d, err := json.Marshal(&Response{ + Topic: msg.Topic, + Payload: utils.AsString(msg.Payload), + }) + + if err != nil { + wp.log.Error("error marshaling response", "error", err) + wp.put(res) + break + } // put data into the bytes buffer - err := conn.Write(websocket.BinaryMessage, msg.GetPayload()) + err = c.(*connection.Connection).Write(d) if err != nil { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) - } + wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic) + wp.put(res) continue } } - - // put map with results back - wp.put(res) case <-wp.exit: wp.log.Info("get exit signal, exiting from the workers pool") return |