diff options
author | Valery Piashchynski <[email protected]> | 2021-06-01 00:10:31 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2021-06-01 00:10:31 +0300 |
commit | 548ee4432e48b316ada00feec1a6b89e67ae4f2f (patch) | |
tree | 5cd2aaeeafdb50e3e46824197c721223f54695bf /plugins/websockets/pool | |
parent | 8cd696bbca8fac2ced30d8172c41b7434ec86650 (diff) | |
parent | df4d316d519cea6dff654bd917521a616a37f769 (diff) |
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
#660 feat(plugin): `broadcast` and `broadcast-ws` plugins update to RR2
Diffstat (limited to 'plugins/websockets/pool')
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go new file mode 100644 index 00000000..8f18580f --- /dev/null +++ b/plugins/websockets/pool/workers_pool.go @@ -0,0 +1,117 @@ +package pool + +import ( + "sync" + + "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/plugins/logger" + "github.com/spiral/roadrunner/v2/plugins/websockets/connection" + "github.com/spiral/roadrunner/v2/plugins/websockets/storage" +) + +type WorkersPool struct { + storage *storage.Storage + connections *sync.Map + resPool sync.Pool + log logger.Logger + + queue chan *pubsub.Message + exit chan struct{} +} + +// NewWorkersPool constructs worker pool for the websocket connections +func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { + wp := &WorkersPool{ + connections: connections, + queue: make(chan *pubsub.Message, 100), + storage: storage, + log: log, + exit: make(chan struct{}), + } + + wp.resPool.New = func() interface{} { + return make(map[string]struct{}, 10) + } + + // start 10 workers + for i := 0; i < 10; i++ { + wp.do() + } + + return wp +} + +func (wp *WorkersPool) Queue(msg *pubsub.Message) { + wp.queue <- msg +} + +func (wp *WorkersPool) Stop() { + for i := 0; i < 10; i++ { + wp.exit <- struct{}{} + } + + close(wp.exit) +} + +func (wp *WorkersPool) put(res map[string]struct{}) { + // optimized + // https://go-review.googlesource.com/c/go/+/110055/ + // not O(n), but O(1) + for k := range res { + delete(res, k) + } +} + +func (wp *WorkersPool) get() map[string]struct{} { + return wp.resPool.Get().(map[string]struct{}) +} + +func (wp *WorkersPool) do() { //nolint:gocognit + go func() { + for { + select { + case msg, ok := <-wp.queue: + if !ok { + return + } + // do not handle nil's + if msg == nil { + continue + } + if len(msg.Topics) == 0 { + continue + } + res := wp.get() + // get connections for the particular topic + wp.storage.GetByPtr(msg.Topics, res) + if len(res) == 0 { + wp.log.Info("no such topic", "topic", msg.Topics) + wp.put(res) + continue + } + + for i := range res { + c, ok := wp.connections.Load(i) + if !ok { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) + continue + } + + conn := c.(*connection.Connection) + err := conn.Write(websocket.BinaryMessage, msg.Payload) + if err != nil { + wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) + wp.put(res) + continue + } + } + + wp.put(res) + case <-wp.exit: + wp.log.Info("get exit signal, exiting from the workers pool") + return + } + } + }() +} |