From 0a9aea326045e56716f0736f7aa8520305362c51 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Thu, 27 May 2021 13:26:12 +0300 Subject: - Move bst to the pkg folder - Add comments - Fix all golangci-lint warnings Signed-off-by: Valery Piashchynski --- plugins/websockets/executor/executor.go | 33 ++++----------------------------- 1 file changed, 4 insertions(+), 29 deletions(-) (limited to 'plugins/websockets/executor/executor.go') diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 391c9a8c..9ef5e40a 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,11 +22,11 @@ type Executor struct { // associated connection ID connID string - pubsub pubsub.PubSub + pubsub map[string]pubsub.PubSub } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor { return &Executor{ conn: conn, connID: connID, @@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error { switch msg.Command() { // handle leave case commands.Join: - // TODO access validators model update - //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) - //// validation error - //if err != nil { - // e.log.Error("validation error", "error", err) - // - // resp := &Response{ - // Topic: "#join", - // Payload: msg.Topics(), - // } - // - // packet, err := json.Marshal(resp) - // if err != nil { - // e.log.Error("error marshal the body", "error", err) - // return err - // } - // - // err = e.conn.Write(websocket.BinaryMessage, packet) - // if err != nil { - // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) - // continue - // } - // - // continue - //} // associate connection with topics e.storage.Store(e.connID, msg.Topics()) @@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Subscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue @@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Unsubscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue -- cgit v1.2.3