diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 8f18580f..7fcc873b 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -4,10 +4,11 @@ import ( "sync" "github.com/fasthttp/websocket" - "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" + "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { @@ -16,7 +17,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *pubsub.Message + queue chan *message.Message exit chan struct{} } @@ -24,7 +25,7 @@ type WorkersPool struct { func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *pubsub.Message, 100), + queue: make(chan *message.Message, 100), storage: storage, log: log, exit: make(chan struct{}), @@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. return wp } -func (wp *WorkersPool) Queue(msg *pubsub.Message) { +func (wp *WorkersPool) Queue(msg *message.Message) { wp.queue <- msg } @@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit if !ok { return } - // do not handle nil's + _ = msg if msg == nil { continue } - if len(msg.Topics) == 0 { + if msg.TopicsLength() == 0 { continue } res := wp.get() - // get connections for the particular topic - wp.storage.GetByPtr(msg.Topics, res) + for i := 0; i < msg.TopicsLength(); i++ { + // get connections for the particular topic + wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + } if len(res) == 0 { wp.log.Info("no such topic", "topic", msg.Topics) wp.put(res) @@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit } conn := c.(*connection.Connection) - err := conn.Write(websocket.BinaryMessage, msg.Payload) + // TODO sync pool for the bytes + bb := make([]byte, msg.PayloadLength()) + for i := 0; i < msg.PayloadLength(); i++ { + bb[i] = byte(msg.Payload(i)) + } + err := conn.Write(websocket.BinaryMessage, bb) if err != nil { wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) wp.put(res) |