diff options
author | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-05-27 13:26:12 +0300 |
commit | 0a9aea326045e56716f0736f7aa8520305362c51 (patch) | |
tree | 532ca326690d81e97136248dd798d23a56843278 /plugins/websockets | |
parent | 57a30c2b49c36161b3af3e539a8618c2d39a5cc9 (diff) |
- Move bst to the pkg folder
- Add comments
- Fix all golangci-lint warnings
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/connection/connection.go | 2 | ||||
-rw-r--r-- | plugins/websockets/executor/executor.go | 33 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 41 | ||||
-rw-r--r-- | plugins/websockets/storage/storage.go | 2 |
4 files changed, 24 insertions, 54 deletions
diff --git a/plugins/websockets/connection/connection.go b/plugins/websockets/connection/connection.go index 5eb30c61..2b847173 100644 --- a/plugins/websockets/connection/connection.go +++ b/plugins/websockets/connection/connection.go @@ -43,8 +43,6 @@ func (c *Connection) Write(mt int, data []byte) error { } func (c *Connection) Read() (int, []byte, error) { - //c.RLock() - //defer c.RUnlock() const op = errors.Op("websocket_read") mt, data, err := c.conn.ReadMessage() diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 391c9a8c..9ef5e40a 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,11 +22,11 @@ type Executor struct { // associated connection ID connID string - pubsub pubsub.PubSub + pubsub map[string]pubsub.PubSub } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor { return &Executor{ conn: conn, connID: connID, @@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error { switch msg.Command() { // handle leave case commands.Join: - // TODO access validators model update - //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) - //// validation error - //if err != nil { - // e.log.Error("validation error", "error", err) - // - // resp := &Response{ - // Topic: "#join", - // Payload: msg.Topics(), - // } - // - // packet, err := json.Marshal(resp) - // if err != nil { - // e.log.Error("error marshal the body", "error", err) - // return err - // } - // - // err = e.conn.Write(websocket.BinaryMessage, packet) - // if err != nil { - // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) - // continue - // } - // - // continue - //} // associate connection with topics e.storage.Store(e.connID, msg.Topics()) @@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Subscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue @@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Unsubscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index a247da69..bc5028e6 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -58,28 +58,25 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger) error { func (p *Plugin) Serve() chan error { errCh := make(chan error) - go func() { - ps := p.pubsubs["redis"] - - for { - // get message - // get topic - // get connection uuid from the storage by the topic - // write payload into the connection - // do that in the workers pool - data, err := ps.Next() - if err != nil { - errCh <- err - return - } - if data == nil { - continue - } + // run all pubsubs drivers + for _, v := range p.pubsubs { + go func(ps pubsub.PubSub) { + for { + data, err := ps.Next() + if err != nil { + errCh <- err + return + } - p.workersPool.Queue(data) - } - }() + if data == nil { + continue + } + + p.workersPool.Queue(data) + } + }(v) + } return errCh } @@ -156,7 +153,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { // Executor wraps a connection to have a safe abstraction p.Lock() - e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs["redis"]) + e := executor.NewExecutor(safeConn, p.log, p.storage, connectionID, p.pubsubs) p.Unlock() p.log.Info("websocket client connected", "uuid", connectionID) @@ -169,6 +166,7 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { }) } +// Publish is an entry point to the websocket PUBSUB func (p *Plugin) Publish(msg []pubsub.Message) error { p.Lock() defer p.Unlock() @@ -199,7 +197,6 @@ func (p *Plugin) PublishAsync(msg []pubsub.Message) { p.log.Error("publish async error", "error", err) return } - } } }() diff --git a/plugins/websockets/storage/storage.go b/plugins/websockets/storage/storage.go index a7e49207..34f53cfd 100644 --- a/plugins/websockets/storage/storage.go +++ b/plugins/websockets/storage/storage.go @@ -3,7 +3,7 @@ package storage import ( "sync" - "github.com/spiral/roadrunner/v2/plugins/memory/bst" + "github.com/spiral/roadrunner/v2/pkg/bst" ) type Storage struct { |