diff options
author | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-02 17:25:09 +0300 |
commit | 12c031ce76c505128ebf9daafa91952855f202d4 (patch) | |
tree | 51846c0cd8a452246e383deb2ac00cce9ef1b92c /plugins/websockets/plugin.go | |
parent | 352b0f7cfcc1beaeb4d66777f30732f4003ce6d2 (diff) |
- Switch from the json to flatbuffers
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 55 |
1 files changed, 25 insertions, 30 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 16cde0cc..fe55d30e 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -227,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } - p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -291,41 +290,37 @@ func (p *Plugin) Publish(m []byte) error { defer p.Unlock() // Get payload - fbsMsg := message.GetRootAsMessages(m, 0) - tmpMsg := &message.Message{} - - for i := 0; i < fbsMsg.MessagesLength(); i++ { - fbsMsg.Messages(tmpMsg, i) - - for j := 0; j < tmpMsg.TopicsLength(); j++ { - if br, ok := p.pubsubs[utils.AsString(tmpMsg.Broker())]; ok { - table := tmpMsg.Table() - err := br.Publish(table.ByteVector(0)) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", tmpMsg.Broker()) + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + return errors.E(err) } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } return nil } -func (p *Plugin) PublishAsync(msg []byte) { - //go func() { - // p.Lock() - // defer p.Unlock() - // for i := 0; i < len(msg); i++ { - // for j := 0; j < len(msg[i].Topics); j++ { - // err := p.pubsubs[msg[i].Broker].Publish(msg) - // if err != nil { - // p.log.Error("publish async error", "error", err) - // return - // } - // } - // } - //}() +func (p *Plugin) PublishAsync(m []byte) { + go func() { + p.Lock() + defer p.Unlock() + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + p.log.Error("publish async error", "error", err) + return + } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) + } + } + }() } func (p *Plugin) defaultAccessValidator(pool phpPool.Pool) validator.AccessValidatorFn { |