summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool/workers_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r--plugins/websockets/pool/workers_pool.go46
1 files changed, 12 insertions, 34 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 544f3ede..efafb2d3 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,7 +1,6 @@
package pool
import (
- "bytes"
"sync"
"github.com/fasthttp/websocket"
@@ -9,14 +8,12 @@ import (
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
- bPool sync.Pool
log logger.Logger
queue chan *message.Message
@@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
- wp.bPool.New = func() interface{} {
- return new(bytes.Buffer)
- }
// start 10 workers
for i := 0; i < 50; i++ {
@@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
- b.Reset()
- wp.bPool.Put(b)
-}
-
-func (wp *WorkersPool) getBytes() *bytes.Buffer {
- return wp.bPool.Get().(*bytes.Buffer)
-}
-
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg == nil {
continue
}
- if msg.TopicsLength() == 0 {
+ if len(msg.GetTopics()) == 0 {
continue
}
- br, ok := wp.storage[utils.AsString(msg.Broker())]
+ br, ok := wp.storage[msg.Broker]
if !ok {
- wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage)
continue
}
res := wp.get()
- bb := wp.getBytes()
- for i := 0; i < msg.TopicsLength(); i++ {
+ for i := 0; i < len(msg.GetTopics()); i++ {
// get connections for the particular topic
- br.Connections(utils.AsString(msg.Topics(i)), res)
+ br.Connections(msg.GetTopics()[i], res)
}
if len(res) == 0 {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Info("no such topic", "topic", msg.GetTopics()[i])
}
- wp.putBytes(bb)
wp.put(res)
continue
}
@@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
@@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit
conn := c.(*connection.Connection)
// put data into the bytes buffer
- for i := 0; i < msg.PayloadLength(); i++ {
- bb.WriteByte(byte(msg.Payload(i)))
- }
- err := conn.Write(websocket.BinaryMessage, bb.Bytes())
+ err := conn.Write(websocket.BinaryMessage, msg.GetPayload())
if err != nil {
- for i := 0; i < msg.TopicsLength(); i++ {
- wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i])
}
continue
}
}
- // put bytes buffer back
- wp.putBytes(bb)
// put map with results back
wp.put(res)
case <-wp.exit: