diff options
author | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-05 13:21:35 +0300 |
commit | d0ec24066b5fbb4e3accc9c45f72f7c638b35dba (patch) | |
tree | 7e6ec1a320f596b31f205caee5d5753eaa42f4ff | |
parent | 0323e070103cc2c30d2cdfb12719d753acafe151 (diff) |
- Websockets bug fixing and polishing
Signed-off-by: Valery Piashchynski <[email protected]>
-rw-r--r-- | pkg/bst/bst.go | 16 | ||||
-rw-r--r-- | pkg/bst/interface.go | 2 | ||||
-rw-r--r-- | pkg/pubsub/interface.go | 13 | ||||
-rw-r--r-- | plugins/http/plugin.go | 6 | ||||
-rw-r--r-- | plugins/memory/plugin.go | 45 | ||||
-rw-r--r-- | plugins/redis/fanin.go | 13 | ||||
-rw-r--r-- | plugins/redis/plugin.go | 75 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 26 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 48 | ||||
-rw-r--r-- | plugins/websockets/pool/workers_pool.go | 63 |
10 files changed, 229 insertions, 78 deletions
diff --git a/pkg/bst/bst.go b/pkg/bst/bst.go index 664937ba..f8426b12 100644 --- a/pkg/bst/bst.go +++ b/pkg/bst/bst.go @@ -52,6 +52,22 @@ func (b *BST) Insert(uuid string, topic string) { } } +func (b *BST) Contains(topic string) bool { + curr := b + for curr != nil { + switch { + case topic < curr.topic: + curr = curr.left + case topic > curr.topic: + curr = curr.right + case topic == curr.topic: + return true + } + } + + return false +} + func (b *BST) Get(topic string) map[string]struct{} { curr := b for curr != nil { diff --git a/pkg/bst/interface.go b/pkg/bst/interface.go index ecf40414..95b03e11 100644 --- a/pkg/bst/interface.go +++ b/pkg/bst/interface.go @@ -8,4 +8,6 @@ type Storage interface { Remove(uuid, topic string) // Get will return all connections associated with the topic Get(topic string) map[string]struct{} + // Contains checks if the BST contains a topic + Contains(topic string) bool } diff --git a/pkg/pubsub/interface.go b/pkg/pubsub/interface.go index 18c1a80c..2d5d9595 100644 --- a/pkg/pubsub/interface.go +++ b/pkg/pubsub/interface.go @@ -2,6 +2,10 @@ package pubsub import "github.com/spiral/roadrunner/v2/pkg/pubsub/message" +/* +This interface is in BETA. It might be changed. +*/ + // PubSub ... type PubSub interface { Publisher @@ -10,15 +14,20 @@ type PubSub interface { } // Subscriber defines the ability to operate as message passing broker. +// BETA interface type Subscriber interface { // Subscribe broker to one or multiple topics. - Subscribe(topics ...string) error + Subscribe(connectionID string, topics ...string) error // Unsubscribe from one or multiply topics - Unsubscribe(topics ...string) error + Unsubscribe(connectionID string, topics ...string) error + + // Connections returns all connections associated with the particular topic + Connections(topic string, ret map[string]struct{}) } // Publisher publish one or more messages +// BETA interface type Publisher interface { // Publish one or multiple Channel. Publish(messages []byte) error diff --git a/plugins/http/plugin.go b/plugins/http/plugin.go index 1952679a..ba83344a 100644 --- a/plugins/http/plugin.go +++ b/plugins/http/plugin.go @@ -248,6 +248,12 @@ func (p *Plugin) Stop() error { // ServeHTTP handles connection using set of middleware and pool PSR-7 server. func (p *Plugin) ServeHTTP(w http.ResponseWriter, r *http.Request) { + defer func() { + err := r.Body.Close() + if err != nil { + p.log.Error("body close", "error", err) + } + }() if headerContainsUpgrade(r) { http.Error(w, "server does not support upgrade header", http.StatusInternalServerError) return diff --git a/plugins/memory/plugin.go b/plugins/memory/plugin.go index eb87b39e..6732ff5d 100644 --- a/plugins/memory/plugin.go +++ b/plugins/memory/plugin.go @@ -3,6 +3,7 @@ package memory import ( "sync" + "github.com/spiral/roadrunner/v2/pkg/bst" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/utils" @@ -13,17 +14,19 @@ const ( ) type Plugin struct { + sync.RWMutex log logger.Logger // channel with the messages from the RPC pushCh chan []byte // user-subscribed topics - topics sync.Map + storage bst.Storage } func (p *Plugin) Init(log logger.Logger) error { p.log = log - p.pushCh = make(chan []byte, 100) + p.pushCh = make(chan []byte, 10) + p.storage = bst.NewBST() return nil } @@ -35,44 +38,62 @@ func (p *Plugin) Name() string { return PluginName } -func (p *Plugin) Publish(messages []byte) error { - p.pushCh <- messages +func (p *Plugin) Publish(message []byte) error { + p.pushCh <- message return nil } -func (p *Plugin) PublishAsync(messages []byte) { +func (p *Plugin) PublishAsync(message []byte) { go func() { - p.pushCh <- messages + p.pushCh <- message }() } -func (p *Plugin) Subscribe(topics ...string) error { +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() for i := 0; i < len(topics); i++ { - p.topics.Store(topics[i], struct{}{}) + p.storage.Insert(connectionID, topics[i]) } return nil } -func (p *Plugin) Unsubscribe(topics ...string) error { +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + p.Lock() + defer p.Unlock() for i := 0; i < len(topics); i++ { - p.topics.Delete(topics[i]) + p.storage.Remove(connectionID, topics[i]) } return nil } +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + p.RLock() + defer p.RUnlock() + + ret := p.storage.Get(topic) + for rr := range ret { + res[rr] = struct{}{} + } +} + func (p *Plugin) Next() (*message.Message, error) { msg := <-p.pushCh - if msg == nil { return nil, nil } + p.RLock() + defer p.RUnlock() + fbsMsg := message.GetRootAsMessage(msg, 0) // push only messages, which are subscribed // TODO better??? for i := 0; i < fbsMsg.TopicsLength(); i++ { - if _, ok := p.topics.Load(utils.AsString(fbsMsg.Topics(i))); ok { + // if we have active subscribers - send a message to a topic + // or send nil instead + if ok := p.storage.Contains(utils.AsString(fbsMsg.Topics(i))); ok { return fbsMsg, nil } } diff --git a/plugins/redis/fanin.go b/plugins/redis/fanin.go index 3082f24f..321bfaaa 100644 --- a/plugins/redis/fanin.go +++ b/plugins/redis/fanin.go @@ -15,6 +15,7 @@ import ( type FanIn struct { sync.Mutex + // redis client client redis.UniversalClient pubsub *redis.PubSub @@ -26,7 +27,7 @@ type FanIn struct { exit chan struct{} } -func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { +func newFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { out := make(chan *message.Message, 100) fi := &FanIn{ out: out, @@ -42,7 +43,7 @@ func NewFanIn(redisClient redis.UniversalClient, log logger.Logger) *FanIn { return fi } -func (fi *FanIn) AddChannel(topics ...string) error { +func (fi *FanIn) sub(topics ...string) error { const op = errors.Op("fanin_addchannel") err := fi.pubsub.Subscribe(context.Background(), topics...) if err != nil { @@ -71,22 +72,22 @@ func (fi *FanIn) read() { } } -func (fi *FanIn) RemoveChannel(topics ...string) error { +func (fi *FanIn) unsub(topic string) error { const op = errors.Op("fanin_remove") - err := fi.pubsub.Unsubscribe(context.Background(), topics...) + err := fi.pubsub.Unsubscribe(context.Background(), topic) if err != nil { return errors.E(op, err) } return nil } -func (fi *FanIn) Stop() error { +func (fi *FanIn) stop() error { fi.exit <- struct{}{} close(fi.out) close(fi.exit) return nil } -func (fi *FanIn) Consume() <-chan *message.Message { +func (fi *FanIn) consume() <-chan *message.Message { return fi.out } diff --git a/plugins/redis/plugin.go b/plugins/redis/plugin.go index 5b9de5fc..7b5721f4 100644 --- a/plugins/redis/plugin.go +++ b/plugins/redis/plugin.go @@ -15,7 +15,7 @@ import ( const PluginName = "redis" type Plugin struct { - sync.Mutex + sync.RWMutex // config for RR integration cfg *Config // logger @@ -23,6 +23,7 @@ type Plugin struct { // redis universal client universalClient redis.UniversalClient + // fanIn implementation used to deliver messages from all channels to the single websocket point fanin *FanIn } @@ -70,7 +71,7 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { }) // init fanin - p.fanin = NewFanIn(p.universalClient, log) + p.fanin = newFanIn(p.universalClient, log) return nil } @@ -82,7 +83,7 @@ func (p *Plugin) Serve() chan error { func (p *Plugin) Stop() error { const op = errors.Op("redis_plugin_stop") - err := p.fanin.Stop() + err := p.fanin.stop() if err != nil { return errors.E(op, err) } @@ -132,15 +133,73 @@ func (p *Plugin) PublishAsync(msg []byte) { }() } -func (p *Plugin) Subscribe(topics ...string) error { - return p.fanin.AddChannel(topics...) +func (p *Plugin) Subscribe(connectionID string, topics ...string) error { + // just add a connection + for i := 0; i < len(topics); i++ { + // key - topic + // value - connectionID + hset := p.universalClient.SAdd(context.Background(), topics[i], connectionID) + res, err := hset.Result() + if err != nil { + return err + } + if res == 0 { + p.log.Warn("could not subscribe to the provided topic", "connectionID", connectionID, "topic", topics[i]) + continue + } + } + + // and subscribe after + return p.fanin.sub(topics...) } -func (p *Plugin) Unsubscribe(topics ...string) error { - return p.fanin.RemoveChannel(topics...) +func (p *Plugin) Unsubscribe(connectionID string, topics ...string) error { + // Remove topics from the storage + for i := 0; i < len(topics); i++ { + srem := p.universalClient.SRem(context.Background(), topics[i], connectionID) + if srem.Err() != nil { + return srem.Err() + } + } + + for i := 0; i < len(topics); i++ { + // if there are no such topics, we can safely unsubscribe from the redis + ssc := p.universalClient.SMembers(context.Background(), topics[i]) + res, err := ssc.Result() + if err != nil { + return err + } + + // if we have associated connections - skip + if len(res) > 0 { + continue + } + + // else - unsubscribe + err = p.fanin.unsub(topics[i]) + if err != nil { + return err + } + } + + return nil +} + +func (p *Plugin) Connections(topic string, res map[string]struct{}) { + hget := p.universalClient.SMembersMap(context.Background(), topic) + r, err := hget.Result() + if err != nil { + panic(err) + } + + // assighn connections + // res expected to be from the sync.Pool + for k := range r { + res[k] = struct{}{} + } } // Next return next message func (p *Plugin) Next() (*message.Message, error) { - return <-p.fanin.Consume(), nil + return <-p.fanin.consume(), nil } diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 24ea19ce..69aad7d4 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -12,7 +12,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/commands" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" ) @@ -23,9 +22,8 @@ type Response struct { type Executor struct { sync.Mutex - conn *connection.Connection - storage *storage.Storage - log logger.Logger + conn *connection.Connection + log logger.Logger // associated connection ID connID string @@ -39,12 +37,11 @@ type Executor struct { } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, +func NewExecutor(conn *connection.Connection, log logger.Logger, connID string, pubsubs map[string]pubsub.PubSub, av validator.AccessValidatorFn, r *http.Request) *Executor { return &Executor{ conn: conn, connID: connID, - storage: bst, log: log, pubsub: pubsubs, accessValidator: av, @@ -175,16 +172,14 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit func (e *Executor) Set(br pubsub.PubSub, topics []string) error { // associate connection with topics - err := br.Subscribe(topics...) + err := br.Subscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics - _ = br.Unsubscribe(topics...) + _ = br.Unsubscribe(e.connID, topics...) return err } - e.storage.InsertMany(e.connID, topics) - // save topics for the connection for i := 0; i < len(topics); i++ { e.actualTopics[topics[i]] = struct{}{} @@ -195,8 +190,7 @@ func (e *Executor) Set(br pubsub.PubSub, topics []string) error { func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { // remove associated connections from the storage - e.storage.RemoveMany(e.connID, topics) - err := br.Unsubscribe(topics...) + err := br.Unsubscribe(e.connID, topics...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) return err @@ -211,15 +205,15 @@ func (e *Executor) Leave(br pubsub.PubSub, topics []string) error { } func (e *Executor) CleanUp() { + // unsubscribe particular connection from the topics for topic := range e.actualTopics { - // remove from the bst - e.storage.Remove(e.connID, topic) - + // here for _, ps := range e.pubsub { - _ = ps.Unsubscribe(topic) + _ = ps.Unsubscribe(e.connID, topic) } } + // clean up the actualTopics data for k := range e.actualTopics { delete(e.actualTopics, k) } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index fe55d30e..4c0edcad 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -24,7 +24,6 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/connection" "github.com/spiral/roadrunner/v2/plugins/websockets/executor" "github.com/spiral/roadrunner/v2/plugins/websockets/pool" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" "github.com/spiral/roadrunner/v2/utils" ) @@ -43,7 +42,6 @@ type Plugin struct { // global connections map connections sync.Map - storage *storage.Storage // GO workers pool workersPool *pool.WorkersPool @@ -73,10 +71,10 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se p.pubsubs = make(map[string]pubsub.PubSub) p.log = log - p.storage = storage.NewStorage() - p.workersPool = pool.NewWorkersPool(p.storage, &p.connections, log) p.wsUpgrade = &websocket.Upgrader{ HandshakeTimeout: time.Second * 60, + ReadBufferSize: 1024, + WriteBufferSize: 1024, } p.serveExit = make(chan struct{}) p.server = server @@ -107,6 +105,8 @@ func (p *Plugin) Serve() chan error { p.accessValidator = p.defaultAccessValidator(p.phpPool) }() + p.workersPool = pool.NewWorkersPool(p.pubsubs, &p.connections, p.log) + // run all pubsubs drivers for _, v := range p.pubsubs { go func(ps pubsub.PubSub) { @@ -133,8 +133,13 @@ func (p *Plugin) Stop() error { // close workers pool p.workersPool.Stop() p.Lock() + if p.phpPool == nil { + p.Unlock() + return nil + } p.phpPool.Destroy(context.Background()) p.Unlock() + return nil } @@ -206,27 +211,34 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // store connection p.connections.Store(connectionID, safeConn) - defer func() { - // close the connection on exit - err = safeConn.Close() - if err != nil { - p.log.Error("connection close", "error", err) - } - - // when exiting - delete the connection - p.connections.Delete(connectionID) - }() - // Executor wraps a connection to have a safe abstraction - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs, p.accessValidator, r) + e := executor.NewExecutor(safeConn, p.log, connectionID, p.pubsubs, p.accessValidator, r) p.log.Info("websocket client connected", "uuid", connectionID) - defer e.CleanUp() err = e.StartCommandLoop() if err != nil { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } + + // when exiting - delete the connection + p.connections.Delete(connectionID) + + // remove connection from all topics from all pub-sub drivers + e.CleanUp() + + err = r.Body.Close() + if err != nil { + p.log.Error("body close", "error", err) + } + + // close the connection on exit + err = safeConn.Close() + if err != nil { + p.log.Error("connection close", "error", err) + } + + safeConn = nil p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -325,8 +337,6 @@ func (p *Plugin) PublishAsync(m []byte) { func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { return func(r *http.Request, topics ...string) (*validator.AccessValidator, error) { - p.RLock() - defer p.RUnlock() const op = errors.Op("access_validator") p.log.Debug("validation", "topics", topics) diff --git a/plugins/websockets/pool/workers_pool.go b/plugins/websockets/pool/workers_pool.go index 7fcc873b..544f3ede 100644 --- a/plugins/websockets/pool/workers_pool.go +++ b/plugins/websockets/pool/workers_pool.go @@ -1,20 +1,22 @@ package pool import ( + "bytes" "sync" "github.com/fasthttp/websocket" + "github.com/spiral/roadrunner/v2/pkg/pubsub" "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/plugins/logger" "github.com/spiral/roadrunner/v2/plugins/websockets/connection" - "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/utils" ) type WorkersPool struct { - storage *storage.Storage + storage map[string]pubsub.PubSub connections *sync.Map resPool sync.Pool + bPool sync.Pool log logger.Logger queue chan *message.Message @@ -22,11 +24,11 @@ type WorkersPool struct { } // NewWorkersPool constructs worker pool for the websocket connections -func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger.Logger) *WorkersPool { +func NewWorkersPool(pubsubs map[string]pubsub.PubSub, connections *sync.Map, log logger.Logger) *WorkersPool { wp := &WorkersPool{ connections: connections, queue: make(chan *message.Message, 100), - storage: storage, + storage: pubsubs, log: log, exit: make(chan struct{}), } @@ -34,9 +36,12 @@ func NewWorkersPool(storage *storage.Storage, connections *sync.Map, log logger. wp.resPool.New = func() interface{} { return make(map[string]struct{}, 10) } + wp.bPool.New = func() interface{} { + return new(bytes.Buffer) + } // start 10 workers - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.do() } @@ -48,7 +53,7 @@ func (wp *WorkersPool) Queue(msg *message.Message) { } func (wp *WorkersPool) Stop() { - for i := 0; i < 10; i++ { + for i := 0; i < 50; i++ { wp.exit <- struct{}{} } @@ -68,6 +73,15 @@ func (wp *WorkersPool) get() map[string]struct{} { return wp.resPool.Get().(map[string]struct{}) } +func (wp *WorkersPool) putBytes(b *bytes.Buffer) { + b.Reset() + wp.bPool.Put(b) +} + +func (wp *WorkersPool) getBytes() *bytes.Buffer { + return wp.bPool.Get().(*bytes.Buffer) +} + func (wp *WorkersPool) do() { //nolint:gocognit go func() { for { @@ -83,13 +97,26 @@ func (wp *WorkersPool) do() { //nolint:gocognit if msg.TopicsLength() == 0 { continue } + + br, ok := wp.storage[utils.AsString(msg.Broker())] + if !ok { + wp.log.Warn("no such broker", "requested", utils.AsString(msg.Broker()), "available", wp.storage) + continue + } + res := wp.get() + bb := wp.getBytes() + for i := 0; i < msg.TopicsLength(); i++ { // get connections for the particular topic - wp.storage.GetOneByPtr(utils.AsString(msg.Topics(i)), res) + br.Connections(utils.AsString(msg.Topics(i)), res) } + if len(res) == 0 { - wp.log.Info("no such topic", "topic", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Info("no such topic", "topic", utils.AsString(msg.Topics(i))) + } + wp.putBytes(bb) wp.put(res) continue } @@ -97,24 +124,30 @@ func (wp *WorkersPool) do() { //nolint:gocognit for i := range res { c, ok := wp.connections.Load(i) if !ok { - wp.log.Warn("the user disconnected connection before the message being written to it", "broker", msg.Broker, "topics", msg.Topics) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Warn("the user disconnected connection before the message being written to it", "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } conn := c.(*connection.Connection) - // TODO sync pool for the bytes - bb := make([]byte, msg.PayloadLength()) + + // put data into the bytes buffer for i := 0; i < msg.PayloadLength(); i++ { - bb[i] = byte(msg.Payload(i)) + bb.WriteByte(byte(msg.Payload(i))) } - err := conn.Write(websocket.BinaryMessage, bb) + err := conn.Write(websocket.BinaryMessage, bb.Bytes()) if err != nil { - wp.log.Error("error sending payload over the connection", "broker", msg.Broker, "topics", msg.Topics) - wp.put(res) + for i := 0; i < msg.TopicsLength(); i++ { + wp.log.Error("error sending payload over the connection", "error", err, "broker", utils.AsString(msg.Broker()), "topics", utils.AsString(msg.Topics(i))) + } continue } } + // put bytes buffer back + wp.putBytes(bb) + // put map with results back wp.put(res) case <-wp.exit: wp.log.Info("get exit signal, exiting from the workers pool") |