summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool/workers_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r--plugins/websockets/pool/workers_pool.go26
1 files changed, 17 insertions, 9 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 8f18580f..7fcc873b 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -4,10 +4,11 @@ import (
"sync"
"github.com/fasthttp/websocket"
- "github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
+ "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
@@ -16,7 +17,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan *pubsub.Message
+ queue chan *message.Message
exit chan struct{}
}
@@ -24,7 +25,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan *pubsub.Message, 100),
+ queue: make(chan *message.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +43,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg *pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *message.Message) {
wp.queue <- msg
}
@@ -75,16 +76,18 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if !ok {
return
}
- // do not handle nil's
+ _ = msg
if msg == nil {
continue
}
- if len(msg.Topics) == 0 {
+ if msg.TopicsLength() == 0 {
continue
}
res := wp.get()
- // get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics, res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ // get connections for the particular topic
+ wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ }
if len(res) == 0 {
wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
@@ -99,7 +102,12 @@ func (wp *WorkersPool) do() { //nolint:gocognit
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload)
+ // TODO sync pool for the bytes
+ bb := make([]byte, msg.PayloadLength())
+ for i := 0; i < msg.PayloadLength(); i++ {
+ bb[i] = byte(msg.Payload(i))
+ }
+ err := conn.Write(websocket.BinaryMessage, bb)
if err != nil {
wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)