summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go34
1 files changed, 19 insertions, 15 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 9b21ff8f..fe55d30e 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -15,6 +15,7 @@ import (
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
+ "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -25,6 +26,7 @@ import (
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/storage"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
+ "github.com/spiral/roadrunner/v2/utils"
)
const (
@@ -225,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler {
p.log.Error("command loop error, disconnecting", "error", err.Error())
return
}
-
p.log.Info("disconnected", "connectionID", connectionID)
})
}
@@ -284,36 +285,39 @@ func (p *Plugin) Reset() error {
}
// Publish is an entry point to the websocket PUBSUB
-func (p *Plugin) Publish(msg []*pubsub.Message) error {
+func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- if br, ok := p.pubsubs[msg[i].Broker]; ok {
- err := br.Publish(msg)
- if err != nil {
- return errors.E(err)
- }
- } else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker)
+ // Get payload
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
+ if err != nil {
+ return errors.E(err)
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
return nil
}
-func (p *Plugin) PublishAsync(msg []*pubsub.Message) {
+func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- for i := 0; i < len(msg); i++ {
- for j := 0; j < len(msg[i].Topics); j++ {
- err := p.pubsubs[msg[i].Broker].Publish(msg)
+ fbsMsg := message.GetRootAsMessage(m, 0)
+ for i := 0; i < fbsMsg.TopicsLength(); i++ {
+ if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
+ err := br.Publish(fbsMsg.Table().Bytes)
if err != nil {
p.log.Error("publish async error", "error", err)
return
}
+ } else {
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
}
}
}()