summaryrefslogtreecommitdiff
path: root/plugins/websockets/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r--plugins/websockets/executor/executor.go33
1 files changed, 4 insertions, 29 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 391c9a8c..9ef5e40a 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -22,11 +22,11 @@ type Executor struct {
// associated connection ID
connID string
- pubsub pubsub.PubSub
+ pubsub map[string]pubsub.PubSub
}
// NewExecutor creates protected connection and starts command loop
-func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs pubsub.PubSub) *Executor {
+func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.Storage, connID string, pubsubs map[string]pubsub.PubSub) *Executor {
return &Executor{
conn: conn,
connID: connID,
@@ -58,31 +58,6 @@ func (e *Executor) StartCommandLoop() error {
switch msg.Command() {
// handle leave
case commands.Join:
- // TODO access validators model update
- //err := validator.NewValidator().AssertTopicsAccess(e.handler, e.httpRequest, msg.Topics()...)
- //// validation error
- //if err != nil {
- // e.log.Error("validation error", "error", err)
- //
- // resp := &Response{
- // Topic: "#join",
- // Payload: msg.Topics(),
- // }
- //
- // packet, err := json.Marshal(resp)
- // if err != nil {
- // e.log.Error("error marshal the body", "error", err)
- // return err
- // }
- //
- // err = e.conn.Write(websocket.BinaryMessage, packet)
- // if err != nil {
- // e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
- // continue
- // }
- //
- // continue
- //}
// associate connection with topics
e.storage.Store(e.connID, msg.Topics())
@@ -103,7 +78,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Subscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue
@@ -125,7 +100,7 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub.Unsubscribe(msg.Topics()...)
+ err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
if err != nil {
e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
continue