summaryrefslogtreecommitdiff
path: root/plugins/websockets
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-18 12:38:36 +0300
committerValery Piashchynski <[email protected]>2021-06-18 12:38:36 +0300
commit1229d24e574f9632ea68dea721fe7ed437afbb85 (patch)
tree334eb3c4a02d8ec559af42850fab2fb45325ddf7 /plugins/websockets
parent9e8bad3988c1fec2e545898d529446f7b93e537b (diff)
- Add broadcast tests
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets')
-rw-r--r--plugins/websockets/executor/executor.go25
-rw-r--r--plugins/websockets/plugin.go2
2 files changed, 14 insertions, 13 deletions
diff --git a/plugins/websockets/executor/executor.go b/plugins/websockets/executor/executor.go
index 0583be0c..664b4dfd 100644
--- a/plugins/websockets/executor/executor.go
+++ b/plugins/websockets/executor/executor.go
@@ -22,6 +22,7 @@ type Response struct {
type Executor struct {
sync.Mutex
+ // raw ws connection
conn *connection.Connection
log logger.Logger
@@ -67,20 +68,20 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
err = json.Unmarshal(data, msg)
if err != nil {
- e.log.Error("error unmarshal message", "error", err)
+ e.log.Error("unmarshal message", "error", err)
continue
}
// nil message, continue
if msg == nil {
- e.log.Warn("get nil message, skipping")
+ e.log.Warn("nil message, skipping")
continue
}
switch msg.Command {
// handle leave
case commands.Join:
- e.log.Debug("get join command", "msg", msg)
+ e.log.Debug("received join command", "msg", msg)
val, err := e.accessValidator(e.req, msg.Topics...)
if err != nil {
@@ -95,13 +96,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, errJ := json.Marshal(resp)
if errJ != nil {
- e.log.Error("error marshal the body", "error", errJ)
+ e.log.Error("marshal the body", "error", errJ)
return errors.E(op, fmt.Errorf("%v,%v", err, errJ))
}
errW := e.conn.Write(packet)
if errW != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", errW)
+ e.log.Error("write payload to the connection", "payload", packet, "error", errW)
return errors.E(op, fmt.Errorf("%v,%v", err, errW))
}
@@ -115,13 +116,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
@@ -133,7 +134,7 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
// handle leave
case commands.Leave:
- e.log.Debug("get leave command", "msg", msg)
+ e.log.Debug("received leave command", "msg", msg)
// prepare response
resp := &Response{
@@ -143,13 +144,13 @@ func (e *Executor) StartCommandLoop() error { //nolint:gocognit
packet, err := json.Marshal(resp)
if err != nil {
- e.log.Error("error marshal the body", "error", err)
+ e.log.Error("marshal the body", "error", err)
return errors.E(op, err)
}
err = e.conn.Write(packet)
if err != nil {
- e.log.Error("error writing payload to the connection", "payload", packet, "error", err)
+ e.log.Error("write payload to the connection", "payload", packet, "error", err)
return errors.E(op, err)
}
@@ -170,7 +171,7 @@ func (e *Executor) Set(topics []string) error {
// associate connection with topics
err := e.sub.Subscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
// in case of error, unsubscribe connection from the dead topics
_ = e.sub.Unsubscribe(e.connID, topics...)
return err
@@ -188,7 +189,7 @@ func (e *Executor) Leave(topics []string) error {
// remove associated connections from the storage
err := e.sub.Unsubscribe(e.connID, topics...)
if err != nil {
- e.log.Error("error subscribing to the provided topics", "topics", topics, "error", err.Error())
+ e.log.Error("subscribe to the provided topics", "topics", topics, "error", err.Error())
return err
}
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index f0b7c6c3..ca5f2f59 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -123,7 +123,7 @@ func (p *Plugin) Serve() chan error {
p.workersPool = pool.NewWorkersPool(p.subReader, &p.connections, p.log)
- // run all pubsubs drivers
+ // we need here only Reader part of the interface
go func(ps pubsub.Reader) {
for {
select {