summaryrefslogtreecommitdiff
path: root/plugins/websockets/pool/workers_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/pool/workers_pool.go')
-rw-r--r--plugins/websockets/pool/workers_pool.go30
1 files changed, 20 insertions, 10 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 87e931d0..8f18580f 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -16,7 +16,7 @@ type WorkersPool struct {
resPool sync.Pool
log logger.Logger
- queue chan pubsub.Message
+ queue chan *pubsub.Message
exit chan struct{}
}
@@ -24,7 +24,7 @@ type WorkersPool struct {
func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
- queue: make(chan pubsub.Message, 100),
+ queue: make(chan *pubsub.Message, 100),
storage: storage,
log: log,
exit: make(chan struct{}),
@@ -42,7 +42,7 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
return wp
}
-func (wp *WorkersPool) Queue(msg pubsub.Message) {
+func (wp *WorkersPool) Queue(msg *pubsub.Message) {
wp.queue <- msg
}
@@ -67,16 +67,26 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
-func (wp *WorkersPool) do() {
+func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
select {
- case msg := <-wp.queue:
+ case msg, ok := <-wp.queue:
+ if !ok {
+ return
+ }
+ // do not handle nil's
+ if msg == nil {
+ continue
+ }
+ if len(msg.Topics) == 0 {
+ continue
+ }
res := wp.get()
// get connections for the particular topic
- wp.storage.GetByPtr(msg.Topics(), res)
+ wp.storage.GetByPtr(msg.Topics, res)
if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.Topics())
+ wp.log.Info("no such topic", "topic", msg.Topics)
wp.put(res)
continue
}
@@ -84,14 +94,14 @@ func (wp *WorkersPool) do() {
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker(), "topics", msg.Topics())
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
continue
}
conn := c.(*connection.Connection)
- err := conn.Write(websocket.BinaryMessage, msg.Payload())
+ err := conn.Write(websocket.BinaryMessage, msg.Payload)
if err != nil {
- wp.log.Error("error sending payload over the connection", "broker", msg.Broker(), "topics", msg.Topics())
+ wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
wp.put(res)
continue
}