summaryrefslogtreecommitdiff
path: root/plugins/websockets/plugin.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/websockets/plugin.go')
-rw-r--r--plugins/websockets/plugin.go40
1 files changed, 27 insertions, 13 deletions
diff --git a/plugins/websockets/plugin.go b/plugins/websockets/plugin.go
index 4c0edcad..6ddd609c 100644
--- a/plugins/websockets/plugin.go
+++ b/plugins/websockets/plugin.go
@@ -14,8 +14,8 @@ import (
"github.com/spiral/roadrunner/v2/pkg/payload"
phpPool "github.com/spiral/roadrunner/v2/pkg/pool"
"github.com/spiral/roadrunner/v2/pkg/process"
+ websocketsv1 "github.com/spiral/roadrunner/v2/pkg/proto/websockets/v1beta"
"github.com/spiral/roadrunner/v2/pkg/pubsub"
- "github.com/spiral/roadrunner/v2/pkg/pubsub/message"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/spiral/roadrunner/v2/plugins/config"
"github.com/spiral/roadrunner/v2/plugins/http/attributes"
@@ -23,9 +23,10 @@ import (
"github.com/spiral/roadrunner/v2/plugins/server"
"github.com/spiral/roadrunner/v2/plugins/websockets/connection"
"github.com/spiral/roadrunner/v2/plugins/websockets/executor"
+ "github.com/spiral/roadrunner/v2/plugins/websockets/memory"
"github.com/spiral/roadrunner/v2/plugins/websockets/pool"
"github.com/spiral/roadrunner/v2/plugins/websockets/validator"
- "github.com/spiral/roadrunner/v2/utils"
+ "google.golang.org/protobuf/proto"
)
const (
@@ -79,6 +80,9 @@ func (p *Plugin) Init(cfg config.Configurer, log logger.Logger, server server.Se
p.serveExit = make(chan struct{})
p.server = server
+ // attach default driver
+ p.pubsubs["memory"] = memory.NewInMemory(p.log)
+
return nil
}
@@ -301,16 +305,21 @@ func (p *Plugin) Publish(m []byte) error {
p.Lock()
defer p.Unlock()
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ return err
+ }
+
// Get payload
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
return errors.E(err)
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
return nil
@@ -320,16 +329,21 @@ func (p *Plugin) PublishAsync(m []byte) {
go func() {
p.Lock()
defer p.Unlock()
- fbsMsg := message.GetRootAsMessage(m, 0)
- for i := 0; i < fbsMsg.TopicsLength(); i++ {
- if br, ok := p.pubsubs[utils.AsString(fbsMsg.Broker())]; ok {
- err := br.Publish(fbsMsg.Table().Bytes)
+ msg := &websocketsv1.Message{}
+ err := proto.Unmarshal(m, msg)
+ if err != nil {
+ p.log.Error("message unmarshal")
+ }
+
+ // Get payload
+ for i := 0; i < len(msg.GetTopics()); i++ {
+ if br, ok := p.pubsubs[msg.GetBroker()]; ok {
+ err := br.Publish(m)
if err != nil {
p.log.Error("publish async error", "error", err)
- return
}
} else {
- p.log.Warn("no such broker", "available", p.pubsubs, "requested", fbsMsg.Broker())
+ p.log.Warn("no such broker", "available", p.pubsubs, "requested", msg.GetBroker())
}
}
}()