summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
committerValery Piashchynski <[email protected]>2021-06-02 17:25:09 +0300
commit12c031ce76c505128ebf9daafa91952855f202d4 (patch)
tree51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/websockets/plugin.go
parent352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff)
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go55
1 files changed, 25 insertions, 30 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 16cde0cc..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error {
defer p.Unlock()
// Get payload
- fbsMsg := message.GetRootAsMessages(m, 0)
- tmpMsg := &message.Message{}
-
- for i := 0; i < fbsMsg.MessagesLength(); i++ {
- fbsMsg.Messages(tmpMsg, i)
-
- for j := 0; j < tmpMsg.TopicsLength(); j++ {
- if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok {
- table := tmpMsg.Table()
- err := br.Publish(table.ByteVector(0))
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker())
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []byte) {
- //go func() {
- // p.Lock()
- // defer p.Unlock()
- // for i := 0; i < len(msg); i++ {
- // for j := 0; j < len(msg[i].Topics); j++ {
- // err := p.pubsubs[msg[i].Broker].Publish(msg)
- // if err != nil {
- // p.log.Error("publish async error", "error", err)
- // return
- // }
- // }
- // }
- //}()
+func (p *Plugin) PublishAsync(m []byte) {
+ go func() {
+ p.Lock()
+ defer p.Unlock()
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ p.log.Error("publish async error", "error", err)
+ return
+ }
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ }
+ }
+ }()
}
func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn {