summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
committerValery Piashchynski <[email protected]>2021-06-05 13:21:35 +0300
commitd0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch)
tree7e6ec1a320f596b31f205caee5d5753eaa42f4ff /plugins/websockets
parent0323e070103cc2c30d2cdfb12719d753acafe151 (diff)
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/executor/executor.go26
-rw-r--r--plugins/websockets/plugin.go48
-rw-r--r--plugins/websockets/pool/workers_pool.go63
3 files changed, 87 insertions, 50 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 24ea19ce..69aad7d4 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -12,7 +12,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/commands"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
)
@@ -23,9 +22,8 @@ type Response struct {
type Executor struct {
sync.Mutex
- conn *connection.Connection
- storage *storage.Storage
- log logger.Logger
+ conn *connection.Connection
+ log logger.Logger
// associated connection ID
connID string
@@ -39,12 +37,11 @@ type Executor struct {
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage,
+func NewExecutor(conn *connection.Connection, log logger.Logger,
connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor {
return &Executor{
conn: conn,
connID: connID,
- storage: bst,
log: log,
pubsub: pubsubs,
accessValidator: av,
@@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
// associate connection with topics
- err := br.Subscribe(topics...)
+ err := br.Subscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
- _ = br.Unsubscribe(topics...)
+ _ = br.Unsubscribe(e.connID, topics...)
return err
}
- e.storage.InsertMany(e.connID, topics)
-
// save topics for the connection
for i := 0; i < len(topics); i++ {
e.actualTopics[topics[i]] = struct{}{}
@@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error {
func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
// remove associated connections from the storage
- e.storage.RemoveMany(e.connID, topics)
- err := br.Unsubscribe(topics...)
+ err := br.Unsubscribe(e.connID, topics...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
return err
@@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error {
}
func (e *Executor) CleanUp() {
+ // unsubscribe particular connection from the topics
for topic := range e.actualTopics {
- // remove from the bst
- e.storage.Remove(e.connID, topic)
-
+ // here
for _, ps := range e.pubsub {
- _ = ps.Unsubscribe(topic)
+ _ = ps.Unsubscribe(e.connID, topic)
}
}
+ // clean up the actualTopics data
for k := range e.actualTopics {
delete(e.actualTopics, k)
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index fe55d30e..4c0edcad 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -24,7 +24,6 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
"github.com/spiral/roadrunner/v2/utils"
)
@@ -43,7 +42,6 @@ type Plugin struct {
// global connections map
connections sync.Map
- storage *storage.Storage
// GO workers pool
workersPool *pool.WorkersPool
@@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.pubsubs = make(map[string]pubsub.PubSub)
p.log = log
- p.storage = storage.NewStorage()
- p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log)
p.wsUpgrade = &websocket.Upgrader{
HandshakeTimeout: time.Second * 60,
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
}
p.serveExit = make(chan struct{})
p.server = server
@@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error {
p.accessValidator = p.defaultAccessValidator(p.phpPool)
}()
+ p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log)
+
// run all pubsubs drivers
for _, v := range p.pubsubs {
go func(ps pubsub.PubSub) {
@@ -133,8 +133,13 @@ func (p *Plugin) Stop() error {
// close workers pool
p.workersPool.Stop()
p.Lock()
+ if p.phpPool == nil {
+ p.Unlock()
+ return nil
+ }
p.phpPool.Destroy(context.Background())
p.Unlock()
+
return nil
}
@@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
// store connection
p.connections.Store(connectionID, safeConn)
- defer func() {
- // close the connection on exit
- err = safeConn.Close()
- if err != nil {
- p.log.Error("connection close", "error", err)
- }
-
- // when exiting - delete the connection
- p.connections.Delete(connectionID)
- }()
-
// Executor wraps a connection to have a safe abstraction
- e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r)
+ e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r)
p.log.Info("websocket client connected", "uuid", connectionID)
- defer e.CleanUp()
err = e.StartCommandLoop()
if err != nil {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
+
+ // when exiting - delete the connection
+ p.connections.Delete(connectionID)
+
+ // remove connection from all topics from all pub-sub drivers
+ e.CleanUp()
+
+ err = r.Body.Close()
+ if err != nil {
+ p.log.Error("body close", "error", err)
+ }
+
+ // close the connection on exit
+ err = safeConn.Close()
+ if err != nil {
+ p.log.Error("connection close", "error", err)
+ }
+
+ safeConn = nil
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) {
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {
return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) {
- p.RLock()
- defer p.RUnlock()
const op = errors.Op("access_validator")
p.log.Debug("validation", "topics", topics)
diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go
index 7fcc873b..544f3ede 100644
--- a/plugins/websockets/pool/workers_pool.go
+++ b/plugins/websockets/pool/workers_pool.go
@@ -1,20 +1,22 @@
package pool
import (
+ "bytes"
"sync"
"github.com/fasthttp/websocket"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub"
"github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/plugins/logger"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
- "github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/utils"
)
type WorkersPool struct {
- storage *storage.Storage
+ storage map[string]pubsub.PubSub
connections *sync.Map
resPool sync.Pool
+ bPool sync.Pool
log logger.Logger
queue chan *message.Message
@@ -22,11 +24,11 @@ type WorkersPool struct {
}
// NewWorkersPool constructs worker pool for the websocket connections
-func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool {
+func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool {
wp := &WorkersPool{
connections: connections,
queue: make(chan *message.Message, 100),
- storage: storage,
+ storage: pubsubs,
log: log,
exit: make(chan struct{}),
}
@@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.
wp.resPool.New = func() interface{} {
return make(map[string]struct{}, 10)
}
+ wp.bPool.New = func() interface{} {
+ return new(bytes.Buffer)
+ }
// start 10 workers
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.do()
}
@@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) {
}
func (wp *WorkersPool) Stop() {
- for i := 0; i < 10; i++ {
+ for i := 0; i < 50; i++ {
wp.exit <- struct{}{}
}
@@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} {
return wp.resPool.Get().(map[string]struct{})
}
+func (wp *WorkersPool) putBytes(b *bytes.Buffer) {
+ b.Reset()
+ wp.bPool.Put(b)
+}
+
+func (wp *WorkersPool) getBytes() *bytes.Buffer {
+ return wp.bPool.Get().(*bytes.Buffer)
+}
+
func (wp *WorkersPool) do() { //nolint:gocognit
go func() {
for {
@@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit
if msg.TopicsLength() == 0 {
continue
}
+
+ br, ok := wp.storage[utils.AsString(msg.Broker())]
+ if !ok {
+ wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage)
+ continue
+ }
+
res := wp.get()
+ bb := wp.getBytes()
+
for i := 0; i < msg.TopicsLength(); i++ {
// get connections for the particular topic
- wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res)
+ br.Connections(utils.AsString(msg.Topics(i)), res)
}
+
if len(res) == 0 {
- wp.log.Info("no such topic", "topic", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i)))
+ }
+ wp.putBytes(bb)
wp.put(res)
continue
}
@@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit
for i := range res {
c, ok := wp.connections.Load(i)
if !ok {
- wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
conn := c.(*connection.Connection)
- // TODO sync pool for the bytes
- bb := make([]byte, msg.PayloadLength())
+
+ // put data into the bytes buffer
for i := 0; i < msg.PayloadLength(); i++ {
- bb[i] = byte(msg.Payload(i))
+ bb.WriteByte(byte(msg.Payload(i)))
}
- err := conn.Write(websocket.BinaryMessage, bb)
+ err := conn.Write(websocket.BinaryMessage, bb.Bytes())
if err != nil {
- wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics)
- wp.put(res)
+ for i := 0; i < msg.TopicsLength(); i++ {
+ wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i)))
+ }
continue
}
}
+ // put bytes buffer back
+ wp.putBytes(bb)
+ // put map with results back
wp.put(res)
case <-wp.exit:
wp.log.Info("get exit signal, exiting from the workers pool")