diff options
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r-- | plugins/websockets/executor/executor.go | 33 |
1 files changed, 4 insertions, 29 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 391c9a8c..9ef5e40a 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,11 +22,11 @@ type Executor struct { // associated connection ID connID string - pubsub pubsub.PubSub + pubsub map[string]pubsub.PubSub } // NewExecutor creates protected connection and starts command loop -func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor { +func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor { return &Executor{ conn: conn, connID: connID, @@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error { switch msg.Command() { // handle leave case commands.Join: - // TODO access validators model update - //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...) - //// validation error - //if err != nil { - // e.log.Error("validation error", "error", err) - // - // resp := &Response{ - // Topic: "#join", - // Payload: msg.Topics(), - // } - // - // packet, err := json.Marshal(resp) - // if err != nil { - // e.log.Error("error marshal the body", "error", err) - // return err - // } - // - // err = e.conn.Write(websocket.BinaryMessage, packet) - // if err != nil { - // e.log.Error("error writing payload to the connection", "payload", packet, "error", err) - // continue - // } - // - // continue - //} // associate connection with topics e.storage.Store(e.connID, msg.Topics()) @@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Subscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue @@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub.Unsubscribe(msg.Topics()...) + err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) if err != nil { e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) continue |