diff options
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 30 |
1 files changed, 20 insertions, 10 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 87e931d0..8f18580f 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -16,7 +16,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan pubsub.Message + queue chan *pubsub.Message exit chan struct{} } @@ -24,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan pubsub.Message, 100), + queue: make(chan *pubsub.Message, 100), storage: storage, log: log, exit: make(chan struct{}), @@ -42,7 +42,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. return wp } -func (wp *WorkersPool) Queue(msg pubsub.Message) { +func (wp *WorkersPool) Queue(msg *pubsub.Message) { wp.queue <- msg } @@ -67,16 +67,26 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } -func (wp *WorkersPool) do() { +func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { select { - case msg := <-wp.queue: + case msg, ok := <-wp.queue: + if !ok { + return + } + // do not handle nil's + if msg == nil { + continue + } + if len(msg.Topics) == 0 { + continue + } res := wp.get() // get connections for the particular topic - wp.storage.GetByPtr(msg.Topics(), res) + wp.storage.GetByPtr(msg.Topics, res) if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topics()) + wp.log.Info("no such topic", "topic", msg.Topics) wp.put(res) continue } @@ -84,14 +94,14 @@ func (wp *WorkersPool) do() { for i := range res { c, ok := wp.connections.Load(i) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker(), "topics", msg.Topics()) + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) continue } conn := c.(*connection.Connection) - err := conn.Write(websocket.BinaryMessage, msg.Payload()) + err := conn.Write(websocket.BinaryMessage, msg.Payload) if err != nil { - wp.log.Error("error sending payload over the connection", "broker", msg.Broker(), "topics", msg.Topics()) + wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) wp.put(res) continue } |