diff options
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r-- | plugins/websockets/plugin.go | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go index 9b21ff8f..fe55d30e 100644 --- a/plugins/websockets/plugin.go +++ b/plugins/websockets/plugin.go @@ -15,6 +15,7 @@ import ( phpPool "github.com/spiral/roadrunner/v2/pkg/pool" "github.com/spiral/roadrunner/v2/pkg/process" "github.com/spiral/roadrunner/v2/pkg/pubsub" + "github.com/spiral/roadrunner/v2/pkg/pubsub/message" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/spiral/roadrunner/v2/plugins/config" "github.com/spiral/roadrunner/v2/plugins/http/attributes" @@ -25,6 +26,7 @@ import ( "github.com/spiral/roadrunner/v2/plugins/websockets/pool" "github.com/spiral/roadrunner/v2/plugins/websockets/storage" "github.com/spiral/roadrunner/v2/plugins/websockets/validator" + "github.com/spiral/roadrunner/v2/utils" ) const ( @@ -225,7 +227,6 @@ func (p *Plugin) Middleware(next http.Handler) http.Handler { p.log.Error("command loop error, disconnecting", "error", err.Error()) return } - p.log.Info("disconnected", "connectionID", connectionID) }) } @@ -284,36 +285,39 @@ func (p *Plugin) Reset() error { } // Publish is an entry point to the websocket PUBSUB -func (p *Plugin) Publish(msg []*pubsub.Message) error { +func (p *Plugin) Publish(m []byte) error { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - if br, ok := p.pubsubs[msg[i].Broker]; ok { - err := br.Publish(msg) - if err != nil { - return errors.E(err) - } - } else { - p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg[i].Broker) + // Get payload + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) + if err != nil { + return errors.E(err) } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } return nil } -func (p *Plugin) PublishAsync(msg []*pubsub.Message) { +func (p *Plugin) PublishAsync(m []byte) { go func() { p.Lock() defer p.Unlock() - for i := 0; i < len(msg); i++ { - for j := 0; j < len(msg[i].Topics); j++ { - err := p.pubsubs[msg[i].Broker].Publish(msg) + fbsMsg := message.GetRootAsMessage(m, 0) + for i := 0; i < fbsMsg.TopicsLength(); i++ { + if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok { + err := br.Publish(fbsMsg.Table().Bytes) if err != nil { p.log.Error("publish async error", "error", err) return } + } else { + p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker()) } } }() |