summaryrefslogtreecommitdiff
path: root/plugins/websockets/executor/executor.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/executor/executor.go')
-rw-r--r--plugins/websockets/executor/executor.go31
1 files changed, 19 insertions, 12 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 048a41ed..8db6d9c3 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -39,13 +39,13 @@ func NewExecutor(conn *connection.Connection, log logger.Logger, bst *storage.St
}
}
-func (e *Executor) StartCommandLoop() error {
+func (e *Executor) StartCommandLoop() error { //nolint:gocognit
const op = errors.Op("executor_command_loop")
for {
mt, data, err := e.conn.Read()
if err != nil {
if mt == -1 {
- e.log.Error("socket was closed", "error", err, "message type", mt)
+ e.log.Info("socket was closed", "reason", err, "message type", mt)
return nil
}
@@ -83,10 +83,15 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub[msg.Broker()].Subscribe(msg.Topics()...)
- if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
- continue
+ // subscribe to the topic
+ if br, ok := e.pubsub[msg.Broker()]; ok {
+ err = br.Subscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ // in case of error, unsubscribe connection from the dead topics
+ _ = br.Unsubscribe(msg.Topics()...)
+ continue
+ }
}
// handle leave
@@ -105,18 +110,20 @@ func (e *Executor) StartCommandLoop() error {
continue
}
- err = e.pubsub[msg.Broker()].Unsubscribe(msg.Topics()...)
- if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
- continue
- }
-
err = e.conn.Write(websocket.BinaryMessage, packet)
if err != nil {
e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
continue
}
+ if br, ok := e.pubsub[msg.Broker()]; ok {
+ err = br.Unsubscribe(msg.Topics()...)
+ if err != nil {
+ e.log.Error("error subscribing to the provided topics", "topics", msg.Topics(), "error", err.Error())
+ continue
+ }
+ }
+
case commands.Headers:
default: