diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 46 |
1 files changed, 12 insertions, 34 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 544f3ede..efafb2d3 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,7 +1,6 @@ package pool import ( - "bytes" "sync" "github.com/fasthttp/websocket" @@ -9,14 +8,12 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool - bPool sync.Pool log logger.Logger queue chan *message.Message @@ -36,9 +33,6 @@ func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } - wp.bPool.New = func() interface{} { - return new(bytes.Buffer) - } // start 10 workers for i := 0; i < 50; i++ { @@ -73,15 +67,6 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } -func (wp *WorkersPool) putBytes(b *bytes.Buffer) { - b.Reset() - wp.bPool.Put(b) -} - -func (wp *WorkersPool) getBytes() *bytes.Buffer { - return wp.bPool.Get().(*bytes.Buffer) -} - func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -94,29 +79,27 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg == nil { continue } - if msg.TopicsLength() == 0 { + if len(msg.GetTopics()) == 0 { continue } - br, ok := wp.storage[utils.AsString(msg.Broker())] + br, ok := wp.storage[msg.Broker] if !ok { - wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + wp.log.Warn("no such broker", "requested", msg.GetBroker(), "available", wp.storage) continue } res := wp.get() - bb := wp.getBytes() - for i := 0; i < msg.TopicsLength(); i++ { + for i := 0; i < len(msg.GetTopics()); i++ { // get connections for the particular topic - br.Connections(utils.AsString(msg.Topics(i)), res) + br.Connections(msg.GetTopics()[i], res) } if len(res) == 0 { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) } - wp.putBytes(bb) wp.put(res) continue } @@ -124,8 +107,8 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } @@ -133,20 +116,15 @@ func (wp *WorkersPool) do() { //nolint:gocognit conn := c.(*connection.Connection) // put data into the bytes buffer - for i := 0; i < msg.PayloadLength(); i++ { - bb.WriteByte(byte(msg.Payload(i))) - } - err := conn.Write(websocket.BinaryMessage, bb.Bytes()) + err := conn.Write(websocket.BinaryMessage, msg.GetPayload()) if err != nil { - for i := 0; i < msg.TopicsLength(); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + for i := 0; i < len(msg.GetTopics()); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", msg.GetBroker(), "topics", msg.GetTopics()[i]) } continue } } - // put bytes buffer back - wp.putBytes(bb) // put map with results back wp.put(res) case <-wp.exit: |