summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool/workers_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/websockets/pool/workers_pool.go
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r--plugins/websockets/pool/workers_pool.go63
1 files changed, 48 insertions, 15 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 7fcc873b..544f3ede 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,20 +1,22 @@
package pool
import (
+ "bytes"
"sync"
"github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
- storage *storage.Storage
+ storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
+ bPool sync.Pool
log logger.Logger
queue chan *message.Message
@@ -22,11 +24,11 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *message.Message, 100),
- storage: storage,
+ storage: pubsubs,
log: log,
exit: make(chan struct{}),
}
@@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
+ wp.bPool.New = func() interface{} {
+ return new(bytes.Buffer)
+ }
// start 10 workers
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.do()
}
@@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) {
}
func (wp *WorkersPool) Stop() {
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.exit <- struct{}{}
}
@@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
+func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
+ b.Reset()
+ wp.bPool.Put(b)
+}
+
+func (wp *WorkersPool) getBytes() *bytes.Buffer {
+ return wp.bPool.Get().(*bytes.Buffer)
+}
+
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg.TopicsLength() == 0 {
continue
}
+
+ br, ok := wp.storage[utils.AsString(msg.Broker())]
+ if !ok {
+ wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ continue
+ }
+
res := wp.get()
+ bb := wp.getBytes()
+
for i := 0; i < msg.TopicsLength(); i++ {
// get connections for the particular topic
- wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ br.Connections(utils.AsString(msg.Topics(i)), res)
}
+
if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ }
+ wp.putBytes(bb)
wp.put(res)
continue
}
@@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
conn := c.(*connection.Connection)
- // TODO sync pool for the bytes
- bb := make([]byte, msg.PayloadLength())
+
+ // put data into the bytes buffer
for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
+ bb.WriteByte(byte(msg.Payload(i)))
}
- err := conn.Write(websocket.BinaryMessage, bb)
+ err := conn.Write(websocket.BinaryMessage, bb.Bytes())
if err != nil {
- wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
- wp.put(res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
}
+ // put bytes buffer back
+ wp.putBytes(bb)
+ // put map with results back
wp.put(res)
case <-wp.exit:
wp.log.Info("get exit signal, exiting from the workers pool")