diff options
author | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-20 16:40:14 +0300 |
commit | 2dd30155de6faaf6005027d5337a840310c827f9 (patch) | |
tree | aa6f0ce2d2db2047b7e729b16dd70d721f4bae55 /plugins/websockets | |
parent | 25dfc0d837827d0d1c729d323dd651ca6163fe09 (diff) |
- Update redis/memory pubsubs
- Rework internal message bus
- Add new tests for the broadcast plugin and include them into the GA
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 80 |
1 files changed, 35 insertions, 45 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 3d95ede0..00e053ec 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -7,7 +7,6 @@ import ( "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - websocketsv1 "github.com/spiral/roadrunner/v2/proto/websockets/v1beta" "github.com/spiral/roadrunner/v2/utils" ) @@ -17,7 +16,7 @@ type WorkersPool struct { resPool sync.Pool log logger.Logger - queue chan *websocketsv1.Message + queue chan *pubsub.Message exit chan struct{} } @@ -25,7 +24,7 @@ type WorkersPool struct { func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, - queue: make(chan *websocketsv1.Message, 100), + queue: make(chan *pubsub.Message, 100), subscriber: subscriber, log: log, exit: make(chan struct{}), @@ -43,7 +42,7 @@ func NewWorkersPool(subscriber pubsub.Subscriber, connections *sync.Map, log log return wp } -func (wp *WorkersPool) Queue(msg *websocketsv1.Message) { +func (wp *WorkersPool) Queue(msg *pubsub.Message) { wp.queue <- msg } @@ -83,57 +82,48 @@ func (wp *WorkersPool) do() { //nolint:gocognit return } _ = msg - if msg == nil { + if msg == nil || msg.Topic == "" { continue } - if len(msg.GetTopics()) == 0 { + + // get free map + res := wp.get() + + // get connections for the particular topic + wp.subscriber.Connections(msg.Topic, res) + + if len(res) == 0 { + wp.log.Info("no such topic", "topic", msg.Topic) + wp.put(res) continue } - // send a message to every topic - for i := 0; i < len(msg.GetTopics()); i++ { - // get free map - res := wp.get() + // res is a map with a connectionsID + for connID := range res { + c, ok := wp.connections.Load(connID) + if !ok { + wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.Topic) + wp.put(res) + continue + } - // get connections for the particular topic - wp.subscriber.Connections(msg.GetTopics()[i], res) + d, err := json.Marshal(&Response{ + Topic: msg.Topic, + Payload: utils.AsString(msg.Payload), + }) - if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.GetTopics()[i]) + if err != nil { + wp.log.Error("error marshaling response", "error", err) wp.put(res) - continue + break } - // res is a map with a connectionsID - for connID := range res { - c, ok := wp.connections.Load(connID) - if !ok { - wp.log.Warn("the websocket disconnected before the message being written to it", "topics", msg.GetTopics()[i]) - wp.put(res) - continue - } - - response := &Response{ - Topic: msg.GetTopics()[i], - Payload: utils.AsString(msg.GetPayload()), - } - - d, err := json.Marshal(response) - if err != nil { - wp.log.Error("error marshaling response", "error", err) - wp.put(res) - break - } - - // put data into the bytes buffer - err = c.(*connection.Connection).Write(d) - if err != nil { - for i := 0; i < len(msg.GetTopics()); i++ { - wp.log.Error("error sending payload over the connection", "error", err, "topics", msg.GetTopics()[i]) - } - wp.put(res) - continue - } + // put data into the bytes buffer + err = c.(*connection.Connection).Write(d) + if err != nil { + wp.log.Error("error sending payload over the connection", "error", err, "topic", msg.Topic) + wp.put(res) + continue } } case <-wp.exit: |