summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go34
1 files changed, 22 insertions, 12 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..39a4e139 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -25,7 +25,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -301,16 +301,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &message.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +325,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &message.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()