diff options
author | Valery Piashchynski <[email protected]> | 2021-06-18 12:38:36 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-18 12:38:36 +0300 |
commit | 1229d24e574f9632ea68dea721fe7ed437afbb85 (patch) | |
tree | 334eb3c4a02d8ec559af42850fab2fb45325ddf7 /plugins/websockets | |
parent | 9e8bad3988c1fec2e545898d529446f7b93e537b (diff) |
- Add broadcast tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r-- | plugins/websockets/executor/executor.go | 25 | ||||
-rw-r--r-- | plugins/websockets/plugin.go | 2 |
2 files changed, 14 insertions, 13 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go index 0583be0c..664b4dfd 100644 --- a/plugins/websockets/executor/executor.go +++ b/plugins/websockets/executor/executor.go @@ -22,6 +22,7 @@ type Response struct { type Executor struct { sync.Mutex + // raw ws connection conn *connection.Connection log logger.Logger @@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit err = json.Unmarshal(data, msg) if err != nil { - e.log.Error("error unmarshal message", "error", err) + e.log.Error("unmarshal message", "error", err) continue } // nil message, continue if msg == nil { - e.log.Warn("get nil message, skipping") + e.log.Warn("nil message, skipping") continue } switch msg.Command { // handle leave case commands.Join: - e.log.Debug("get join command", "msg", msg) + e.log.Debug("received join command", "msg", msg) val, err := e.accessValidator(e.req, msg.Topics...) if err != nil { @@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, errJ := json.Marshal(resp) if errJ != nil { - e.log.Error("error marshal the body", "error", errJ) + e.log.Error("marshal the body", "error", errJ) return errors.E(op, fmt.Errorf("%v,%v", err, errJ)) } errW := e.conn.Write(packet) if errW != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", errW) + e.log.Error("write payload to the connection", "payload", packet, "error", errW) return errors.E(op, fmt.Errorf("%v,%v", err, errW)) } @@ -115,13 +116,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } @@ -133,7 +134,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit // handle leave case commands.Leave: - e.log.Debug("get leave command", "msg", msg) + e.log.Debug("received leave command", "msg", msg) // prepare response resp := &Response{ @@ -143,13 +144,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit packet, err := json.Marshal(resp) if err != nil { - e.log.Error("error marshal the body", "error", err) + e.log.Error("marshal the body", "error", err) return errors.E(op, err) } err = e.conn.Write(packet) if err != nil { - e.log.Error("error writing payload to the connection", "payload", packet, "error", err) + e.log.Error("write payload to the connection", "payload", packet, "error", err) return errors.E(op, err) } @@ -170,7 +171,7 @@ func (e *Executor) Set(topics []string) error { // associate connection with topics err := e.sub.Subscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) // in case of error, unsubscribe connection from the dead topics _ = e.sub.Unsubscribe(e.connID, topics...) return err @@ -188,7 +189,7 @@ func (e *Executor) Leave(topics []string) error { // remove associated connections from the storage err := e.sub.Unsubscribe(e.connID, topics...) if err != nil { - e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error()) + e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error()) return err } diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index f0b7c6c3..ca5f2f59 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error { p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log) - // run all pubsubs drivers + // we need here only Reader part of the interface go func(ps pubsub.Reader) { for { select { |