diff options
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r-- | plugins/websockets/executor/executor.go | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 048a41ed..8db6d9c3 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -39,13 +39,13 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St } } -func (e *Executor) StartCommandLoop() error { +func (e *Executor) StartCommandLoop() error { //nolint:gocognit const op = errors.Op("executor_command_loop") for { mt, data, err := e.conn.Read() if err != nil { if mt == -1 { - e.log.Error("socket was closed", "error", err, "message type", mt) + e.log.Info("socket was closed", "reason", err, "message type", mt) return nil } @@ -83,10 +83,15 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...) - if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) - continue + // subscribe to the topic + if br, ok := e.pubsub[msg.Broker()]; ok { + err = br.Subscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + // in case of error, unsubscribe connection from the dead topics + _ = br.Unsubscribe(msg.Topics()...) + continue + } } // handle leave @@ -105,18 +110,20 @@ func (e *Executor) StartCommandLoop() error { continue } - err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...) - if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) - continue - } - err = e.conn.Write(websocket.BinaryMessage, packet) if err != nil { e.log.Error("error writing payload to the connection", "payload", packet, "error", err) continue } + if br, ok := e.pubsub[msg.Broker()]; ok { + err = br.Unsubscribe(msg.Topics()...) + if err != nil { + e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error()) + continue + } + } + case commands.Headers: default: |