diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 63 |
1 files changed, 48 insertions, 15 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 7fcc873b..544f3ede 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,20 +1,22 @@ package pool import ( + "bytes" "sync" "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage *storage.Storage + storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool + bPool sync.Pool log logger.Logger queue chan *message.Message @@ -22,11 +24,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *message.Message, 100), - storage: storage, + storage: pubsubs, log: log, exit: make(chan struct{}), } @@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } + wp.bPool.New = func() interface{} { + return new(bytes.Buffer) + } // start 10 workers - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.do() } @@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) { } func (wp *WorkersPool) Stop() { - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.exit <- struct{}{} } @@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } +func (wp *WorkersPool) putBytes(b *bytes.Buffer) { + b.Reset() + wp.bPool.Put(b) +} + +func (wp *WorkersPool) getBytes() *bytes.Buffer { + return wp.bPool.Get().(*bytes.Buffer) +} + func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg.TopicsLength() == 0 { continue } + + br, ok := wp.storage[utils.AsString(msg.Broker())] + if !ok { + wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + continue + } + res := wp.get() + bb := wp.getBytes() + for i := 0; i < msg.TopicsLength(); i++ { // get connections for the particular topic - wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + br.Connections(utils.AsString(msg.Topics(i)), res) } + if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + } + wp.putBytes(bb) wp.put(res) continue } @@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } conn := c.(*connection.Connection) - // TODO sync pool for the bytes - bb := make([]byte, msg.PayloadLength()) + + // put data into the bytes buffer for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) + bb.WriteByte(byte(msg.Payload(i))) } - err := conn.Write(websocket.BinaryMessage, bb) + err := conn.Write(websocket.BinaryMessage, bb.Bytes()) if err != nil { - wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) - wp.put(res) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } } + // put bytes buffer back + wp.putBytes(bb) + // put map with results back wp.put(res) case <-wp.exit: wp.log.Info("get exit signal, exiting from the workers pool") |